blob: 16fe62918a527af14e16b5019973cf261c977215 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
78def main():
showard27f33872009-04-07 18:20:53 +000079 try:
80 main_without_exception_handling()
81 except:
82 logging.exception('Exception escaping in monitor_db')
83 raise
84
85
86def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000087 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000088
jadmanski0afbb632008-06-06 21:10:57 +000089 parser = optparse.OptionParser(usage)
90 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
91 action='store_true')
92 parser.add_option('--logfile', help='Set a log file that all stdout ' +
93 'should be redirected to. Stderr will go to this ' +
94 'file + ".err"')
95 parser.add_option('--test', help='Indicate that scheduler is under ' +
96 'test and should use dummy autoserv and no parsing',
97 action='store_true')
98 (options, args) = parser.parse_args()
99 if len(args) != 1:
100 parser.print_usage()
101 return
mbligh36768f02008-02-22 18:28:33 +0000102
jadmanski0afbb632008-06-06 21:10:57 +0000103 global RESULTS_DIR
104 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000105
showardcca334f2009-03-12 20:38:34 +0000106 # Change the cwd while running to avoid issues incase we were launched from
107 # somewhere odd (such as a random NFS home directory of the person running
108 # sudo to launch us as the appropriate user).
109 os.chdir(RESULTS_DIR)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000112 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
113 "notify_email_statuses",
114 default='')
showardc85c21b2008-11-24 22:17:37 +0000115 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000116 _notify_email_statuses = [status for status in
117 re.split(r'[\s,;:]', notify_statuses_list.lower())
118 if status]
showardc85c21b2008-11-24 22:17:37 +0000119
jadmanski0afbb632008-06-06 21:10:57 +0000120 if options.test:
121 global _autoserv_path
122 _autoserv_path = 'autoserv_dummy'
123 global _testing_mode
124 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000125
mbligh37eceaa2008-12-15 22:56:37 +0000126 # AUTOTEST_WEB.base_url is still a supported config option as some people
127 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000128 global _base_url
showard170873e2009-01-07 00:22:26 +0000129 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
130 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000131 if config_base_url:
132 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000133 else:
mbligh37eceaa2008-12-15 22:56:37 +0000134 # For the common case of everything running on a single server you
135 # can just set the hostname in a single place in the config file.
136 server_name = c.get_config_value('SERVER', 'hostname')
137 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000138 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000139 sys.exit(1)
140 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000141
showardc5afc462009-01-13 00:09:39 +0000142 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000143 server.start()
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 try:
showardc5afc462009-01-13 00:09:39 +0000146 init(options.logfile)
147 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000148 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000149
jadmanski0afbb632008-06-06 21:10:57 +0000150 while not _shutdown:
151 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000152 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000153 except:
showard170873e2009-01-07 00:22:26 +0000154 email_manager.manager.log_stacktrace(
155 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000156
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000158 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000159 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000160 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
169def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000170 if logfile:
171 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
173 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000174
mblighfb676032009-04-01 18:25:38 +0000175 utils.write_pid("monitor_db")
176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000184 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
188
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000190 signal.signal(signal.SIGINT, handle_sigint)
191
showardd1ee1dd2009-01-07 21:33:08 +0000192 drones = global_config.global_config.get_config_value(
193 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
194 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000195 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000196 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000197 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
198
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000200
201
202def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000203 out_file = logfile
204 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000206 out_fd = open(out_file, "a", buffering=0)
207 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.dup2(out_fd.fileno(), sys.stdout.fileno())
210 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000211
jadmanski0afbb632008-06-06 21:10:57 +0000212 sys.stdout = out_fd
213 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000214
215
showard87ba02a2009-04-20 19:37:32 +0000216def _autoserv_command_line(machines, results_dir, extra_args, job=None,
217 queue_entry=None):
218 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
219 '-r', _drone_manager.absolute_path(results_dir)]
220 if job or queue_entry:
221 if not job:
222 job = queue_entry.job
223 autoserv_argv += ['-u', job.owner, '-l', job.name]
224 return autoserv_argv + extra_args
225
226
showard89f84db2009-03-12 20:39:13 +0000227class SchedulerError(Exception):
228 """Raised by HostScheduler when an inconsistent state occurs."""
229
230
showard63a34772008-08-18 19:32:50 +0000231class HostScheduler(object):
232 def _get_ready_hosts(self):
233 # avoid any host with a currently active queue entry against it
234 hosts = Host.fetch(
235 joins='LEFT JOIN host_queue_entries AS active_hqe '
236 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000237 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000238 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000239 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000240 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
241 return dict((host.id, host) for host in hosts)
242
243
244 @staticmethod
245 def _get_sql_id_list(id_list):
246 return ','.join(str(item_id) for item_id in id_list)
247
248
249 @classmethod
showard989f25d2008-10-01 11:38:11 +0000250 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000251 if not id_list:
252 return {}
showard63a34772008-08-18 19:32:50 +0000253 query %= cls._get_sql_id_list(id_list)
254 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000255 return cls._process_many2many_dict(rows, flip)
256
257
258 @staticmethod
259 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000260 result = {}
261 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000262 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000263 if flip:
264 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000265 result.setdefault(left_id, set()).add(right_id)
266 return result
267
268
269 @classmethod
270 def _get_job_acl_groups(cls, job_ids):
271 query = """
showardd9ac4452009-02-07 02:04:37 +0000272 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000273 FROM jobs
274 INNER JOIN users ON users.login = jobs.owner
275 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
276 WHERE jobs.id IN (%s)
277 """
278 return cls._get_many2many_dict(query, job_ids)
279
280
281 @classmethod
282 def _get_job_ineligible_hosts(cls, job_ids):
283 query = """
284 SELECT job_id, host_id
285 FROM ineligible_host_queues
286 WHERE job_id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_job_dependencies(cls, job_ids):
293 query = """
294 SELECT job_id, label_id
295 FROM jobs_dependency_labels
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard63a34772008-08-18 19:32:50 +0000302 def _get_host_acls(cls, host_ids):
303 query = """
showardd9ac4452009-02-07 02:04:37 +0000304 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000305 FROM acl_groups_hosts
306 WHERE host_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, host_ids)
309
310
311 @classmethod
312 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000313 if not host_ids:
314 return {}, {}
showard63a34772008-08-18 19:32:50 +0000315 query = """
316 SELECT label_id, host_id
317 FROM hosts_labels
318 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000319 """ % cls._get_sql_id_list(host_ids)
320 rows = _db.execute(query)
321 labels_to_hosts = cls._process_many2many_dict(rows)
322 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
323 return labels_to_hosts, hosts_to_labels
324
325
326 @classmethod
327 def _get_labels(cls):
328 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000329
330
331 def refresh(self, pending_queue_entries):
332 self._hosts_available = self._get_ready_hosts()
333
334 relevant_jobs = [queue_entry.job_id
335 for queue_entry in pending_queue_entries]
336 self._job_acls = self._get_job_acl_groups(relevant_jobs)
337 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000338 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000339
340 host_ids = self._hosts_available.keys()
341 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000342 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
343
344 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000345
346
347 def _is_acl_accessible(self, host_id, queue_entry):
348 job_acls = self._job_acls.get(queue_entry.job_id, set())
349 host_acls = self._host_acls.get(host_id, set())
350 return len(host_acls.intersection(job_acls)) > 0
351
352
showard989f25d2008-10-01 11:38:11 +0000353 def _check_job_dependencies(self, job_dependencies, host_labels):
354 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000355 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000356
357
358 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
359 queue_entry):
showardade14e22009-01-26 22:38:32 +0000360 if not queue_entry.meta_host:
361 # bypass only_if_needed labels when a specific host is selected
362 return True
363
showard989f25d2008-10-01 11:38:11 +0000364 for label_id in host_labels:
365 label = self._labels[label_id]
366 if not label.only_if_needed:
367 # we don't care about non-only_if_needed labels
368 continue
369 if queue_entry.meta_host == label_id:
370 # if the label was requested in a metahost it's OK
371 continue
372 if label_id not in job_dependencies:
373 return False
374 return True
375
376
showard89f84db2009-03-12 20:39:13 +0000377 def _check_atomic_group_labels(self, host_labels, queue_entry):
378 """
379 Determine if the given HostQueueEntry's atomic group settings are okay
380 to schedule on a host with the given labels.
381
382 @param host_labels - A list of label ids that the host has.
383 @param queue_entry - The HostQueueEntry being considered for the host.
384
385 @returns True if atomic group settings are okay, False otherwise.
386 """
387 return (self._get_host_atomic_group_id(host_labels) ==
388 queue_entry.atomic_group_id)
389
390
391 def _get_host_atomic_group_id(self, host_labels):
392 """
393 Return the atomic group label id for a host with the given set of
394 labels if any, or None otherwise. Raises an exception if more than
395 one atomic group are found in the set of labels.
396
397 @param host_labels - A list of label ids that the host has.
398
399 @returns The id of the atomic group found on a label in host_labels
400 or None if no atomic group label is found.
401 @raises SchedulerError - If more than one atomic group label is found.
402 """
403 atomic_ids = [self._labels[label_id].atomic_group_id
404 for label_id in host_labels
405 if self._labels[label_id].atomic_group_id is not None]
406 if not atomic_ids:
407 return None
408 if len(atomic_ids) > 1:
409 raise SchedulerError('More than one atomic label on host.')
410 return atomic_ids[0]
411
412
413 def _get_atomic_group_labels(self, atomic_group_id):
414 """
415 Lookup the label ids that an atomic_group is associated with.
416
417 @param atomic_group_id - The id of the AtomicGroup to look up.
418
419 @returns A generator yeilding Label ids for this atomic group.
420 """
421 return (id for id, label in self._labels.iteritems()
422 if label.atomic_group_id == atomic_group_id
423 and not label.invalid)
424
425
426 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
427 """
428 @param group_hosts - A sequence of Host ids to test for usability
429 and eligibility against the Job associated with queue_entry.
430 @param queue_entry - The HostQueueEntry that these hosts are being
431 tested for eligibility against.
432
433 @returns A subset of group_hosts Host ids that are eligible for the
434 supplied queue_entry.
435 """
436 return set(host_id for host_id in group_hosts
437 if self._is_host_usable(host_id)
438 and self._is_host_eligible_for_job(host_id, queue_entry))
439
440
showard989f25d2008-10-01 11:38:11 +0000441 def _is_host_eligible_for_job(self, host_id, queue_entry):
442 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
443 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000444
showard89f84db2009-03-12 20:39:13 +0000445 return (self._is_acl_accessible(host_id, queue_entry) and
446 self._check_job_dependencies(job_dependencies, host_labels) and
447 self._check_only_if_needed_labels(
448 job_dependencies, host_labels, queue_entry) and
449 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000450
451
showard63a34772008-08-18 19:32:50 +0000452 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000453 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000454 return None
455 return self._hosts_available.pop(queue_entry.host_id, None)
456
457
458 def _is_host_usable(self, host_id):
459 if host_id not in self._hosts_available:
460 # host was already used during this scheduling cycle
461 return False
462 if self._hosts_available[host_id].invalid:
463 # Invalid hosts cannot be used for metahosts. They're included in
464 # the original query because they can be used by non-metahosts.
465 return False
466 return True
467
468
469 def _schedule_metahost(self, queue_entry):
470 label_id = queue_entry.meta_host
471 hosts_in_label = self._label_hosts.get(label_id, set())
472 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
473 set())
474
475 # must iterate over a copy so we can mutate the original while iterating
476 for host_id in list(hosts_in_label):
477 if not self._is_host_usable(host_id):
478 hosts_in_label.remove(host_id)
479 continue
480 if host_id in ineligible_host_ids:
481 continue
showard989f25d2008-10-01 11:38:11 +0000482 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000483 continue
484
showard89f84db2009-03-12 20:39:13 +0000485 # Remove the host from our cached internal state before returning
486 # the host object.
showard63a34772008-08-18 19:32:50 +0000487 hosts_in_label.remove(host_id)
488 return self._hosts_available.pop(host_id)
489 return None
490
491
492 def find_eligible_host(self, queue_entry):
493 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000494 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000495 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_metahost(queue_entry)
498
499
showard89f84db2009-03-12 20:39:13 +0000500 def find_eligible_atomic_group(self, queue_entry):
501 """
502 Given an atomic group host queue entry, locate an appropriate group
503 of hosts for the associated job to run on.
504
505 The caller is responsible for creating new HQEs for the additional
506 hosts returned in order to run the actual job on them.
507
508 @returns A list of Host instances in a ready state to satisfy this
509 atomic group scheduling. Hosts will all belong to the same
510 atomic group label as specified by the queue_entry.
511 An empty list will be returned if no suitable atomic
512 group could be found.
513
514 TODO(gps): what is responsible for kicking off any attempted repairs on
515 a group of hosts? not this function, but something needs to. We do
516 not communicate that reason for returning [] outside of here...
517 For now, we'll just be unschedulable if enough hosts within one group
518 enter Repair Failed state.
519 """
520 assert queue_entry.atomic_group_id is not None
521 job = queue_entry.job
522 assert job.synch_count and job.synch_count > 0
523 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
524 if job.synch_count > atomic_group.max_number_of_machines:
525 # Such a Job and HostQueueEntry should never be possible to
526 # create using the frontend. Regardless, we can't process it.
527 # Abort it immediately and log an error on the scheduler.
528 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000529 logging.error(
530 'Error: job %d synch_count=%d > requested atomic_group %d '
531 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
532 job.id, job.synch_count, atomic_group.id,
533 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000534 return []
535 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
536 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
537 set())
538
539 # Look in each label associated with atomic_group until we find one with
540 # enough hosts to satisfy the job.
541 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
542 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
543 if queue_entry.meta_host is not None:
544 # If we have a metahost label, only allow its hosts.
545 group_hosts.intersection_update(hosts_in_label)
546 group_hosts -= ineligible_host_ids
547 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
548 group_hosts, queue_entry)
549
550 # Job.synch_count is treated as "minimum synch count" when
551 # scheduling for an atomic group of hosts. The atomic group
552 # number of machines is the maximum to pick out of a single
553 # atomic group label for scheduling at one time.
554 min_hosts = job.synch_count
555 max_hosts = atomic_group.max_number_of_machines
556
557 if len(eligible_hosts_in_group) < min_hosts:
558 # Not enough eligible hosts in this atomic group label.
559 continue
560
561 # Limit ourselves to scheduling the atomic group size.
562 if len(eligible_hosts_in_group) > max_hosts:
563 eligible_hosts_in_group = random.sample(
564 eligible_hosts_in_group, max_hosts)
565
566 # Remove the selected hosts from our cached internal state
567 # of available hosts in order to return the Host objects.
568 host_list = []
569 for host_id in eligible_hosts_in_group:
570 hosts_in_label.discard(host_id)
571 host_list.append(self._hosts_available.pop(host_id))
572 return host_list
573
574 return []
575
576
showard170873e2009-01-07 00:22:26 +0000577class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000578 def __init__(self):
579 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000580 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000581 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000582 user_cleanup_time = scheduler_config.config.clean_interval
583 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
584 _db, user_cleanup_time)
585 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000586 self._host_agents = {}
587 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000588
mbligh36768f02008-02-22 18:28:33 +0000589
showard915958d2009-04-22 21:00:58 +0000590 def initialize(self, recover_hosts=True):
591 self._periodic_cleanup.initialize()
592 self._24hr_upkeep.initialize()
593
jadmanski0afbb632008-06-06 21:10:57 +0000594 # always recover processes
595 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000596
jadmanski0afbb632008-06-06 21:10:57 +0000597 if recover_hosts:
598 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000599
600
jadmanski0afbb632008-06-06 21:10:57 +0000601 def tick(self):
showard170873e2009-01-07 00:22:26 +0000602 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000603 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000604 self._find_aborting()
605 self._schedule_new_jobs()
606 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000607 _drone_manager.execute_actions()
608 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000609
showard97aed502008-11-04 02:01:24 +0000610
mblighf3294cc2009-04-08 21:17:38 +0000611 def _run_cleanup(self):
612 self._periodic_cleanup.run_cleanup_maybe()
613 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000614
mbligh36768f02008-02-22 18:28:33 +0000615
showard170873e2009-01-07 00:22:26 +0000616 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
617 for object_id in object_ids:
618 agent_dict.setdefault(object_id, set()).add(agent)
619
620
621 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
622 for object_id in object_ids:
623 assert object_id in agent_dict
624 agent_dict[object_id].remove(agent)
625
626
jadmanski0afbb632008-06-06 21:10:57 +0000627 def add_agent(self, agent):
628 self._agents.append(agent)
629 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000630 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
631 self._register_agent_for_ids(self._queue_entry_agents,
632 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000633
showard170873e2009-01-07 00:22:26 +0000634
635 def get_agents_for_entry(self, queue_entry):
636 """
637 Find agents corresponding to the specified queue_entry.
638 """
showardd3dc1992009-04-22 21:01:40 +0000639 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000640
641
642 def host_has_agent(self, host):
643 """
644 Determine if there is currently an Agent present using this host.
645 """
646 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000647
648
jadmanski0afbb632008-06-06 21:10:57 +0000649 def remove_agent(self, agent):
650 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000651 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
652 agent)
653 self._unregister_agent_for_ids(self._queue_entry_agents,
654 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000658 self._register_pidfiles()
659 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000660 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000661 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000662 self._reverify_remaining_hosts()
663 # reinitialize drones after killing orphaned processes, since they can
664 # leave around files when they die
665 _drone_manager.execute_actions()
666 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000667
showard170873e2009-01-07 00:22:26 +0000668
669 def _register_pidfiles(self):
670 # during recovery we may need to read pidfiles for both running and
671 # parsing entries
672 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000673 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000674 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000675 for pidfile_name in _ALL_PIDFILE_NAMES:
676 pidfile_id = _drone_manager.get_pidfile_id_from(
677 queue_entry.execution_tag(), pidfile_name=pidfile_name)
678 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000679
680
showardd3dc1992009-04-22 21:01:40 +0000681 def _recover_entries_with_status(self, status, orphans, pidfile_name,
682 recover_entries_fn):
683 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000684 for queue_entry in queue_entries:
685 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000686 # synchronous job we've already recovered
687 continue
showardd3dc1992009-04-22 21:01:40 +0000688 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000689 execution_tag = queue_entry.execution_tag()
690 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000691 run_monitor.attach_to_existing_process(execution_tag,
692 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000693 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000694 # execution apparently never happened
695 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000696 continue
mbligh90a549d2008-03-25 23:52:34 +0000697
showardd3dc1992009-04-22 21:01:40 +0000698 logging.info('Recovering %s entry %s (process %s)',
699 status.lower(),
700 ', '.join(str(entry) for entry in queue_entries),
701 run_monitor.get_process())
702 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
703 orphans.discard(run_monitor.get_process())
704
705
706 def _kill_remaining_orphan_processes(self, orphans):
707 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000708 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000709 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000710
showard170873e2009-01-07 00:22:26 +0000711
showardd3dc1992009-04-22 21:01:40 +0000712 def _recover_running_entries(self, orphans):
713 def recover_entries(job, queue_entries, run_monitor):
714 if run_monitor is not None:
715 queue_task = RecoveryQueueTask(job=job,
716 queue_entries=queue_entries,
717 run_monitor=run_monitor)
718 self.add_agent(Agent(tasks=[queue_task],
719 num_processes=len(queue_entries)))
720 # else, _requeue_other_active_entries will cover this
721
722 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
723 orphans, '.autoserv_execute',
724 recover_entries)
725
726
727 def _recover_gathering_entries(self, orphans):
728 def recover_entries(job, queue_entries, run_monitor):
729 gather_task = GatherLogsTask(job, queue_entries,
730 run_monitor=run_monitor)
731 self.add_agent(Agent([gather_task]))
732
733 self._recover_entries_with_status(
734 models.HostQueueEntry.Status.GATHERING,
735 orphans, _CRASHINFO_PID_FILE, recover_entries)
736
737
738 def _recover_parsing_entries(self, orphans):
739 def recover_entries(job, queue_entries, run_monitor):
740 reparse_task = FinalReparseTask(queue_entries,
741 run_monitor=run_monitor)
742 self.add_agent(Agent([reparse_task], num_processes=0))
743
744 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
745 orphans, _PARSER_PID_FILE,
746 recover_entries)
747
748
749 def _recover_all_recoverable_entries(self):
750 orphans = _drone_manager.get_orphaned_autoserv_processes()
751 self._recover_running_entries(orphans)
752 self._recover_gathering_entries(orphans)
753 self._recover_parsing_entries(orphans)
754 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000755
showard97aed502008-11-04 02:01:24 +0000756
showard170873e2009-01-07 00:22:26 +0000757 def _requeue_other_active_entries(self):
758 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000759 where='active AND NOT complete AND '
760 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000761 for queue_entry in queue_entries:
762 if self.get_agents_for_entry(queue_entry):
763 # entry has already been recovered
764 continue
showardd3dc1992009-04-22 21:01:40 +0000765 if queue_entry.aborted:
766 queue_entry.abort(self)
767 continue
768
769 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000770 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000771 if queue_entry.host:
772 tasks = queue_entry.host.reverify_tasks()
773 self.add_agent(Agent(tasks))
774 agent = queue_entry.requeue()
775
776
777 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000778 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000779 self._reverify_hosts_where("""(status = 'Repairing' OR
780 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000781 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783 # recover "Running" hosts with no active queue entries, although this
784 # should never happen
785 message = ('Recovering running host %s - this probably indicates a '
786 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000787 self._reverify_hosts_where("""status = 'Running' AND
788 id NOT IN (SELECT host_id
789 FROM host_queue_entries
790 WHERE active)""",
791 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000792
793
jadmanski0afbb632008-06-06 21:10:57 +0000794 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000795 print_message='Reverifying host %s'):
796 full_where='locked = 0 AND invalid = 0 AND ' + where
797 for host in Host.fetch(where=full_where):
798 if self.host_has_agent(host):
799 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000800 continue
showard170873e2009-01-07 00:22:26 +0000801 if print_message:
showardb18134f2009-03-20 20:52:18 +0000802 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000803 tasks = host.reverify_tasks()
804 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000805
806
jadmanski0afbb632008-06-06 21:10:57 +0000807 def _recover_hosts(self):
808 # recover "Repair Failed" hosts
809 message = 'Reverifying dead host %s'
810 self._reverify_hosts_where("status = 'Repair Failed'",
811 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000812
813
showard04c82c52008-05-29 19:38:12 +0000814
showardb95b1bd2008-08-15 18:11:04 +0000815 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000816 # prioritize by job priority, then non-metahost over metahost, then FIFO
817 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000818 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000819 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000820 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000821
822
showard89f84db2009-03-12 20:39:13 +0000823 def _refresh_pending_queue_entries(self):
824 """
825 Lookup the pending HostQueueEntries and call our HostScheduler
826 refresh() method given that list. Return the list.
827
828 @returns A list of pending HostQueueEntries sorted in priority order.
829 """
showard63a34772008-08-18 19:32:50 +0000830 queue_entries = self._get_pending_queue_entries()
831 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000832 return []
showardb95b1bd2008-08-15 18:11:04 +0000833
showard63a34772008-08-18 19:32:50 +0000834 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000835
showard89f84db2009-03-12 20:39:13 +0000836 return queue_entries
837
838
839 def _schedule_atomic_group(self, queue_entry):
840 """
841 Schedule the given queue_entry on an atomic group of hosts.
842
843 Returns immediately if there are insufficient available hosts.
844
845 Creates new HostQueueEntries based off of queue_entry for the
846 scheduled hosts and starts them all running.
847 """
848 # This is a virtual host queue entry representing an entire
849 # atomic group, find a group and schedule their hosts.
850 group_hosts = self._host_scheduler.find_eligible_atomic_group(
851 queue_entry)
852 if not group_hosts:
853 return
854 # The first assigned host uses the original HostQueueEntry
855 group_queue_entries = [queue_entry]
856 for assigned_host in group_hosts[1:]:
857 # Create a new HQE for every additional assigned_host.
858 new_hqe = HostQueueEntry.clone(queue_entry)
859 new_hqe.save()
860 group_queue_entries.append(new_hqe)
861 assert len(group_queue_entries) == len(group_hosts)
862 for queue_entry, host in itertools.izip(group_queue_entries,
863 group_hosts):
864 self._run_queue_entry(queue_entry, host)
865
866
867 def _schedule_new_jobs(self):
868 queue_entries = self._refresh_pending_queue_entries()
869 if not queue_entries:
870 return
871
showard63a34772008-08-18 19:32:50 +0000872 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000873 if (queue_entry.atomic_group_id is None or
874 queue_entry.host_id is not None):
875 assigned_host = self._host_scheduler.find_eligible_host(
876 queue_entry)
877 if assigned_host:
878 self._run_queue_entry(queue_entry, assigned_host)
879 else:
880 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000881
882
883 def _run_queue_entry(self, queue_entry, host):
884 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000885 # in some cases (synchronous jobs with run_verify=False), agent may be
886 # None
showard9976ce92008-10-15 20:28:13 +0000887 if agent:
888 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000889
890
jadmanski0afbb632008-06-06 21:10:57 +0000891 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000892 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
893 for agent in self.get_agents_for_entry(entry):
894 agent.abort()
895 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000896
897
showard324bf812009-01-20 23:23:38 +0000898 def _can_start_agent(self, agent, num_started_this_cycle,
899 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000900 # always allow zero-process agents to run
901 if agent.num_processes == 0:
902 return True
903 # don't allow any nonzero-process agents to run after we've reached a
904 # limit (this avoids starvation of many-process agents)
905 if have_reached_limit:
906 return False
907 # total process throttling
showard324bf812009-01-20 23:23:38 +0000908 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000909 return False
910 # if a single agent exceeds the per-cycle throttling, still allow it to
911 # run when it's the first agent in the cycle
912 if num_started_this_cycle == 0:
913 return True
914 # per-cycle throttling
915 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000916 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000917 return False
918 return True
919
920
jadmanski0afbb632008-06-06 21:10:57 +0000921 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000922 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000923 have_reached_limit = False
924 # iterate over copy, so we can remove agents during iteration
925 for agent in list(self._agents):
926 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000927 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000928 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000929 continue
930 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000931 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000932 have_reached_limit):
933 have_reached_limit = True
934 continue
showard4c5374f2008-09-04 17:02:56 +0000935 num_started_this_cycle += agent.num_processes
936 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000937 logging.info('%d running processes',
938 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000939
940
showard170873e2009-01-07 00:22:26 +0000941class PidfileRunMonitor(object):
942 """
943 Client must call either run() to start a new process or
944 attach_to_existing_process().
945 """
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 class _PidfileException(Exception):
948 """
949 Raised when there's some unexpected behavior with the pid file, but only
950 used internally (never allowed to escape this class).
951 """
mbligh36768f02008-02-22 18:28:33 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000955 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000956 self._start_time = None
957 self.pidfile_id = None
958 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961 def _add_nice_command(self, command, nice_level):
962 if not nice_level:
963 return command
964 return ['nice', '-n', str(nice_level)] + command
965
966
967 def _set_start_time(self):
968 self._start_time = time.time()
969
970
971 def run(self, command, working_directory, nice_level=None, log_file=None,
972 pidfile_name=None, paired_with_pidfile=None):
973 assert command is not None
974 if nice_level is not None:
975 command = ['nice', '-n', str(nice_level)] + command
976 self._set_start_time()
977 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000978 command, working_directory, pidfile_name=pidfile_name,
979 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +0000980
981
showardd3dc1992009-04-22 21:01:40 +0000982 def attach_to_existing_process(self, execution_tag,
983 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +0000984 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000985 self.pidfile_id = _drone_manager.get_pidfile_id_from(
986 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000987 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000988
989
jadmanski0afbb632008-06-06 21:10:57 +0000990 def kill(self):
showard170873e2009-01-07 00:22:26 +0000991 if self.has_process():
992 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000993
mbligh36768f02008-02-22 18:28:33 +0000994
showard170873e2009-01-07 00:22:26 +0000995 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000996 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000997 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000998
999
showard170873e2009-01-07 00:22:26 +00001000 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001001 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001002 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001003 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001004
1005
showard170873e2009-01-07 00:22:26 +00001006 def _read_pidfile(self, use_second_read=False):
1007 assert self.pidfile_id is not None, (
1008 'You must call run() or attach_to_existing_process()')
1009 contents = _drone_manager.get_pidfile_contents(
1010 self.pidfile_id, use_second_read=use_second_read)
1011 if contents.is_invalid():
1012 self._state = drone_manager.PidfileContents()
1013 raise self._PidfileException(contents)
1014 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001015
1016
showard21baa452008-10-21 00:08:39 +00001017 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001018 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1019 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001020 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001022 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001023
1024
1025 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001026 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001027 return
mblighbb421852008-03-11 22:36:16 +00001028
showard21baa452008-10-21 00:08:39 +00001029 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001030
showard170873e2009-01-07 00:22:26 +00001031 if self._state.process is None:
1032 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001033 return
mbligh90a549d2008-03-25 23:52:34 +00001034
showard21baa452008-10-21 00:08:39 +00001035 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001036 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001037 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001038 return
mbligh90a549d2008-03-25 23:52:34 +00001039
showard170873e2009-01-07 00:22:26 +00001040 # pid but no running process - maybe process *just* exited
1041 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001042 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001043 # autoserv exited without writing an exit code
1044 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001045 self._handle_pidfile_error(
1046 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001047
showard21baa452008-10-21 00:08:39 +00001048
1049 def _get_pidfile_info(self):
1050 """\
1051 After completion, self._state will contain:
1052 pid=None, exit_status=None if autoserv has not yet run
1053 pid!=None, exit_status=None if autoserv is running
1054 pid!=None, exit_status!=None if autoserv has completed
1055 """
1056 try:
1057 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001058 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001059 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001060
1061
showard170873e2009-01-07 00:22:26 +00001062 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001063 """\
1064 Called when no pidfile is found or no pid is in the pidfile.
1065 """
showard170873e2009-01-07 00:22:26 +00001066 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001067 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001068 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1069 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001070 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001071 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001072
1073
showard35162b02009-03-03 02:17:30 +00001074 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001075 """\
1076 Called when autoserv has exited without writing an exit status,
1077 or we've timed out waiting for autoserv to write a pid to the
1078 pidfile. In either case, we just return failure and the caller
1079 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001080
showard170873e2009-01-07 00:22:26 +00001081 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001082 """
1083 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001084 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001085 self._state.exit_status = 1
1086 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001090 self._get_pidfile_info()
1091 return self._state.exit_status
1092
1093
1094 def num_tests_failed(self):
1095 self._get_pidfile_info()
1096 assert self._state.num_tests_failed is not None
1097 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001098
1099
mbligh36768f02008-02-22 18:28:33 +00001100class Agent(object):
showard170873e2009-01-07 00:22:26 +00001101 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001102 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001103 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001104 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001105 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001106
showard170873e2009-01-07 00:22:26 +00001107 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1108 for task in tasks)
1109 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1110
showardd3dc1992009-04-22 21:01:40 +00001111 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001112 for task in tasks:
1113 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001114
1115
showardd3dc1992009-04-22 21:01:40 +00001116 def _clear_queue(self):
1117 self.queue = Queue.Queue(0)
1118
1119
showard170873e2009-01-07 00:22:26 +00001120 def _union_ids(self, id_lists):
1121 return set(itertools.chain(*id_lists))
1122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def add_task(self, task):
1125 self.queue.put_nowait(task)
1126 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001127
1128
jadmanski0afbb632008-06-06 21:10:57 +00001129 def tick(self):
showard21baa452008-10-21 00:08:39 +00001130 while not self.is_done():
1131 if self.active_task and not self.active_task.is_done():
1132 self.active_task.poll()
1133 if not self.active_task.is_done():
1134 return
1135 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001139 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001140 if self.active_task:
1141 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001142 if not self.active_task.success:
1143 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001144
jadmanski0afbb632008-06-06 21:10:57 +00001145 self.active_task = None
1146 if not self.is_done():
1147 self.active_task = self.queue.get_nowait()
1148 if self.active_task:
1149 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001153 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001154 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1155 # get reset.
1156 new_agent = Agent(self.active_task.failure_tasks)
1157 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001158
mblighe2586682008-02-29 22:45:46 +00001159
showard4c5374f2008-09-04 17:02:56 +00001160 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001161 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001162
1163
jadmanski0afbb632008-06-06 21:10:57 +00001164 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001165 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001166
1167
jadmanski0afbb632008-06-06 21:10:57 +00001168 def start(self):
1169 assert self.dispatcher
jadmanski0afbb632008-06-06 21:10:57 +00001170 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001171
jadmanski0afbb632008-06-06 21:10:57 +00001172
showardd3dc1992009-04-22 21:01:40 +00001173 def abort(self):
1174 if self.active_task:
1175 self.active_task.abort()
showard6b733412009-04-27 20:09:18 +00001176 if self.active_task.aborted: # tasks can choose to ignore aborts
1177 self.active_task = None
1178 self._clear_queue()
showardd3dc1992009-04-22 21:01:40 +00001179
1180
mbligh36768f02008-02-22 18:28:33 +00001181class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001182 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1183 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001184 self.done = False
1185 self.failure_tasks = failure_tasks
1186 self.started = False
1187 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001188 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001189 self.task = None
1190 self.agent = None
1191 self.monitor = None
1192 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001193 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001194 self.queue_entry_ids = []
1195 self.host_ids = []
1196 self.log_file = None
1197
1198
1199 def _set_ids(self, host=None, queue_entries=None):
1200 if queue_entries and queue_entries != [None]:
1201 self.host_ids = [entry.host.id for entry in queue_entries]
1202 self.queue_entry_ids = [entry.id for entry in queue_entries]
1203 else:
1204 assert host
1205 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001206
1207
jadmanski0afbb632008-06-06 21:10:57 +00001208 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001209 if self.monitor:
1210 self.tick(self.monitor.exit_code())
1211 else:
1212 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001213
1214
jadmanski0afbb632008-06-06 21:10:57 +00001215 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001216 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001217 return
jadmanski0afbb632008-06-06 21:10:57 +00001218 if exit_code == 0:
1219 success = True
1220 else:
1221 success = False
mbligh36768f02008-02-22 18:28:33 +00001222
jadmanski0afbb632008-06-06 21:10:57 +00001223 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001224
1225
jadmanski0afbb632008-06-06 21:10:57 +00001226 def is_done(self):
1227 return self.done
mbligh36768f02008-02-22 18:28:33 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def finished(self, success):
1231 self.done = True
1232 self.success = success
1233 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def prolog(self):
1237 pass
mblighd64e5702008-04-04 21:39:28 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001241 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001242
mbligh36768f02008-02-22 18:28:33 +00001243
jadmanski0afbb632008-06-06 21:10:57 +00001244 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001245 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001246 _drone_manager.copy_to_results_repository(
1247 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def epilog(self):
1251 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001252
1253
jadmanski0afbb632008-06-06 21:10:57 +00001254 def start(self):
1255 assert self.agent
1256
1257 if not self.started:
1258 self.prolog()
1259 self.run()
1260
1261 self.started = True
1262
1263
1264 def abort(self):
1265 if self.monitor:
1266 self.monitor.kill()
1267 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001268 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001269 self.cleanup()
1270
1271
showard170873e2009-01-07 00:22:26 +00001272 def set_host_log_file(self, base_name, host):
1273 filename = '%s.%s' % (time.time(), base_name)
1274 self.log_file = os.path.join('hosts', host.hostname, filename)
1275
1276
showardde634ee2009-01-30 01:44:24 +00001277 def _get_consistent_execution_tag(self, queue_entries):
1278 first_execution_tag = queue_entries[0].execution_tag()
1279 for queue_entry in queue_entries[1:]:
1280 assert queue_entry.execution_tag() == first_execution_tag, (
1281 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1282 queue_entry,
1283 first_execution_tag,
1284 queue_entries[0]))
1285 return first_execution_tag
1286
1287
showard6b733412009-04-27 20:09:18 +00001288 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001289 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001290 if use_monitor is None:
1291 assert self.monitor
1292 use_monitor = self.monitor
1293 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001294 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001295 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001296 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001297 results_path)
showardde634ee2009-01-30 01:44:24 +00001298
1299 reparse_task = FinalReparseTask(queue_entries)
1300 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1301
1302
showardd3dc1992009-04-22 21:01:40 +00001303 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001304 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001305 self.monitor = PidfileRunMonitor()
1306 self.monitor.run(self.cmd, self._working_directory,
1307 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001308 log_file=self.log_file,
1309 pidfile_name=pidfile_name,
1310 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001311
1312
1313class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001314 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001315 """\
showard170873e2009-01-07 00:22:26 +00001316 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001317 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001318 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001319 # normalize the protection name
1320 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001321
jadmanski0afbb632008-06-06 21:10:57 +00001322 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001323 self.queue_entry_to_fail = queue_entry
1324 # *don't* include the queue entry in IDs -- if the queue entry is
1325 # aborted, we want to leave the repair task running
1326 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001327
1328 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001329 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1330 ['-R', '--host-protection', protection],
1331 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001332 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1333
showard170873e2009-01-07 00:22:26 +00001334 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001335
mbligh36768f02008-02-22 18:28:33 +00001336
jadmanski0afbb632008-06-06 21:10:57 +00001337 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001338 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001339 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001340 if self.queue_entry_to_fail:
1341 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001342
1343
showardde634ee2009-01-30 01:44:24 +00001344 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001345 assert self.queue_entry_to_fail
1346
1347 if self.queue_entry_to_fail.meta_host:
1348 return # don't fail metahost entries, they'll be reassigned
1349
1350 self.queue_entry_to_fail.update_from_database()
1351 if self.queue_entry_to_fail.status != 'Queued':
1352 return # entry has been aborted
1353
1354 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001355 # copy results logs into the normal place for job results
1356 _drone_manager.copy_results_on_drone(
1357 self.monitor.get_process(),
1358 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001359 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001360
showardccbd6c52009-03-21 00:10:21 +00001361 self._copy_and_parse_results([self.queue_entry_to_fail])
1362 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001363
1364
jadmanski0afbb632008-06-06 21:10:57 +00001365 def epilog(self):
1366 super(RepairTask, self).epilog()
1367 if self.success:
1368 self.host.set_status('Ready')
1369 else:
1370 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001371 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001372 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001373
1374
showard8fe93b52008-11-18 17:53:22 +00001375class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001376 def epilog(self):
1377 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001378 should_copy_results = (self.queue_entry and not self.success
1379 and not self.queue_entry.meta_host)
1380 if should_copy_results:
1381 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001382 destination = os.path.join(self.queue_entry.execution_tag(),
1383 os.path.basename(self.log_file))
1384 _drone_manager.copy_to_results_repository(
1385 self.monitor.get_process(), self.log_file,
1386 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001387
1388
1389class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001390 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001391 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001392 self.host = host or queue_entry.host
1393 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001394
jadmanski0afbb632008-06-06 21:10:57 +00001395 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001396 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1397 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001398 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001399 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1400 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001401
showard170873e2009-01-07 00:22:26 +00001402 self.set_host_log_file('verify', self.host)
1403 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001404
1405
jadmanski0afbb632008-06-06 21:10:57 +00001406 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001407 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001408 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001409 if self.queue_entry:
1410 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001411 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001412
1413
jadmanski0afbb632008-06-06 21:10:57 +00001414 def epilog(self):
1415 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001416
jadmanski0afbb632008-06-06 21:10:57 +00001417 if self.success:
1418 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001419
1420
mbligh36768f02008-02-22 18:28:33 +00001421class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001422 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001423 self.job = job
1424 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001425 super(QueueTask, self).__init__(cmd, self._execution_tag())
1426 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001427
1428
showard170873e2009-01-07 00:22:26 +00001429 def _format_keyval(self, key, value):
1430 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001431
1432
showard73ec0442009-02-07 02:05:20 +00001433 def _keyval_path(self):
1434 return os.path.join(self._execution_tag(), 'keyval')
1435
1436
1437 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1438 keyval_contents = '\n'.join(self._format_keyval(key, value)
1439 for key, value in keyval_dict.iteritems())
1440 # always end with a newline to allow additional keyvals to be written
1441 keyval_contents += '\n'
1442 _drone_manager.attach_file_to_execution(self._execution_tag(),
1443 keyval_contents,
1444 file_path=keyval_path)
1445
1446
1447 def _write_keyvals_before_job(self, keyval_dict):
1448 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1449
1450
1451 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001452 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001453 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001454 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001455 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001456
1457
showard170873e2009-01-07 00:22:26 +00001458 def _write_host_keyvals(self, host):
1459 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1460 host.hostname)
1461 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001462 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1463 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001464
1465
showard170873e2009-01-07 00:22:26 +00001466 def _execution_tag(self):
1467 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001468
1469
jadmanski0afbb632008-06-06 21:10:57 +00001470 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001471 queued = int(time.mktime(self.job.created_on.timetuple()))
1472 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001473 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001474 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001475 queue_entry.set_status('Running')
1476 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001477 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001478 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001479 assert len(self.queue_entries) == 1
1480 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001481
1482
showard35162b02009-03-03 02:17:30 +00001483 def _write_lost_process_error_file(self):
1484 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1485 _drone_manager.write_lines_to_file(error_file_path,
1486 [_LOST_PROCESS_ERROR])
1487
1488
showardd3dc1992009-04-22 21:01:40 +00001489 def _finish_task(self):
1490 # both of these conditionals can be true, iff the process ran, wrote a
1491 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001492 if self.monitor.has_process():
1493 self._write_keyval_after_job("job_finished", int(time.time()))
showardd3dc1992009-04-22 21:01:40 +00001494 gather_task = GatherLogsTask(self.job, self.queue_entries)
1495 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001496
1497 if self.monitor.lost_process:
1498 self._write_lost_process_error_file()
1499 for queue_entry in self.queue_entries:
1500 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001501
1502
showardcbd74612008-11-19 21:42:02 +00001503 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001504 _drone_manager.write_lines_to_file(
1505 os.path.join(self._execution_tag(), 'status.log'),
1506 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001507 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001508
1509
jadmanskif7fa2cc2008-10-01 14:13:23 +00001510 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001511 if not self.monitor or not self.monitor.has_process():
1512 return
1513
jadmanskif7fa2cc2008-10-01 14:13:23 +00001514 # build up sets of all the aborted_by and aborted_on values
1515 aborted_by, aborted_on = set(), set()
1516 for queue_entry in self.queue_entries:
1517 if queue_entry.aborted_by:
1518 aborted_by.add(queue_entry.aborted_by)
1519 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1520 aborted_on.add(t)
1521
1522 # extract some actual, unique aborted by value and write it out
1523 assert len(aborted_by) <= 1
1524 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001525 aborted_by_value = aborted_by.pop()
1526 aborted_on_value = max(aborted_on)
1527 else:
1528 aborted_by_value = 'autotest_system'
1529 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001530
showarda0382352009-02-11 23:36:43 +00001531 self._write_keyval_after_job("aborted_by", aborted_by_value)
1532 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001533
showardcbd74612008-11-19 21:42:02 +00001534 aborted_on_string = str(datetime.datetime.fromtimestamp(
1535 aborted_on_value))
1536 self._write_status_comment('Job aborted by %s on %s' %
1537 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001538
1539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def abort(self):
1541 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001542 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001543 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001544
1545
jadmanski0afbb632008-06-06 21:10:57 +00001546 def epilog(self):
1547 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001548 self._finish_task()
1549 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001550
1551
mblighbb421852008-03-11 22:36:16 +00001552class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001553 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001554 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001555 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001556
1557
jadmanski0afbb632008-06-06 21:10:57 +00001558 def run(self):
1559 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001560
1561
jadmanski0afbb632008-06-06 21:10:57 +00001562 def prolog(self):
1563 # recovering an existing process - don't do prolog
1564 pass
mblighbb421852008-03-11 22:36:16 +00001565
1566
showardd3dc1992009-04-22 21:01:40 +00001567class PostJobTask(AgentTask):
1568 def __init__(self, queue_entries, pidfile_name, logfile_name,
1569 run_monitor=None):
1570 """
1571 If run_monitor != None, we're recovering a running task.
1572 """
1573 self._queue_entries = queue_entries
1574 self._pidfile_name = pidfile_name
1575 self._run_monitor = run_monitor
1576
1577 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1578 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1579 self._autoserv_monitor = PidfileRunMonitor()
1580 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1581 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1582
1583 if _testing_mode:
1584 command = 'true'
1585 else:
1586 command = self._generate_command(self._results_dir)
1587
1588 super(PostJobTask, self).__init__(cmd=command,
1589 working_directory=self._execution_tag)
1590
1591 self.log_file = os.path.join(self._execution_tag, logfile_name)
1592 self._final_status = self._determine_final_status()
1593
1594
1595 def _generate_command(self, results_dir):
1596 raise NotImplementedError('Subclasses must override this')
1597
1598
1599 def _job_was_aborted(self):
1600 was_aborted = None
1601 for queue_entry in self._queue_entries:
1602 queue_entry.update_from_database()
1603 if was_aborted is None: # first queue entry
1604 was_aborted = bool(queue_entry.aborted)
1605 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1606 email_manager.manager.enqueue_notify_email(
1607 'Inconsistent abort state',
1608 'Queue entries have inconsistent abort state: ' +
1609 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1610 # don't crash here, just assume true
1611 return True
1612 return was_aborted
1613
1614
1615 def _determine_final_status(self):
1616 if self._job_was_aborted():
1617 return models.HostQueueEntry.Status.ABORTED
1618
1619 # we'll use a PidfileRunMonitor to read the autoserv exit status
1620 if self._autoserv_monitor.exit_code() == 0:
1621 return models.HostQueueEntry.Status.COMPLETED
1622 return models.HostQueueEntry.Status.FAILED
1623
1624
1625 def run(self):
1626 if self._run_monitor is not None:
1627 self.monitor = self._run_monitor
1628 else:
1629 # make sure we actually have results to work with.
1630 # this should never happen in normal operation.
1631 if not self._autoserv_monitor.has_process():
1632 email_manager.manager.enqueue_notify_email(
1633 'No results in post-job task',
1634 'No results in post-job task at %s' %
1635 self._autoserv_monitor.pidfile_id)
1636 self.finished(False)
1637 return
1638
1639 super(PostJobTask, self).run(
1640 pidfile_name=self._pidfile_name,
1641 paired_with_pidfile=self._paired_with_pidfile)
1642
1643
1644 def _set_all_statuses(self, status):
1645 for queue_entry in self._queue_entries:
1646 queue_entry.set_status(status)
1647
1648
1649 def abort(self):
1650 # override AgentTask.abort() to avoid killing the process and ending
1651 # the task. post-job tasks continue when the job is aborted.
1652 pass
1653
1654
1655class GatherLogsTask(PostJobTask):
1656 """
1657 Task responsible for
1658 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1659 * copying logs to the results repository
1660 * spawning CleanupTasks for hosts, if necessary
1661 * spawning a FinalReparseTask for the job
1662 """
1663 def __init__(self, job, queue_entries, run_monitor=None):
1664 self._job = job
1665 super(GatherLogsTask, self).__init__(
1666 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1667 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1668 self._set_ids(queue_entries=queue_entries)
1669
1670
1671 def _generate_command(self, results_dir):
1672 host_list = ','.join(queue_entry.host.hostname
1673 for queue_entry in self._queue_entries)
1674 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1675 '-r', results_dir]
1676
1677
1678 def prolog(self):
1679 super(GatherLogsTask, self).prolog()
1680 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1681
1682
1683 def _reboot_hosts(self):
1684 reboot_after = self._job.reboot_after
1685 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001686 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1687 do_reboot = True
1688 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001689 do_reboot = True
1690 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1691 final_success = (
1692 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1693 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1694 do_reboot = (final_success and num_tests_failed == 0)
1695
1696 for queue_entry in self._queue_entries:
1697 if do_reboot:
1698 # don't pass the queue entry to the CleanupTask. if the cleanup
1699 # fails, the job doesn't care -- it's over.
1700 cleanup_task = CleanupTask(host=queue_entry.host)
1701 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1702 else:
1703 queue_entry.host.set_status('Ready')
1704
1705
1706 def epilog(self):
1707 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001708 self._copy_and_parse_results(self._queue_entries,
1709 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001710 self._reboot_hosts()
1711
1712
showard8fe93b52008-11-18 17:53:22 +00001713class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001714 def __init__(self, host=None, queue_entry=None):
1715 assert bool(host) ^ bool(queue_entry)
1716 if queue_entry:
1717 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001718 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001719 self.host = host
showard170873e2009-01-07 00:22:26 +00001720
1721 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001722 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1723 ['--cleanup'],
1724 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001725 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001726 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1727 failure_tasks=[repair_task])
1728
1729 self._set_ids(host=host, queue_entries=[queue_entry])
1730 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001731
mblighd5c95802008-03-05 00:33:46 +00001732
jadmanski0afbb632008-06-06 21:10:57 +00001733 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001734 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001735 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001736 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001737
mblighd5c95802008-03-05 00:33:46 +00001738
showard21baa452008-10-21 00:08:39 +00001739 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001740 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001741 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001742 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001743 self.host.update_field('dirty', 0)
1744
1745
showardd3dc1992009-04-22 21:01:40 +00001746class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001747 _num_running_parses = 0
1748
showardd3dc1992009-04-22 21:01:40 +00001749 def __init__(self, queue_entries, run_monitor=None):
1750 super(FinalReparseTask, self).__init__(queue_entries,
1751 pidfile_name=_PARSER_PID_FILE,
1752 logfile_name='.parse.log',
1753 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001754 # don't use _set_ids, since we don't want to set the host_ids
1755 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001756 self._parse_started = False
1757
showard97aed502008-11-04 02:01:24 +00001758
1759 @classmethod
1760 def _increment_running_parses(cls):
1761 cls._num_running_parses += 1
1762
1763
1764 @classmethod
1765 def _decrement_running_parses(cls):
1766 cls._num_running_parses -= 1
1767
1768
1769 @classmethod
1770 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001771 return (cls._num_running_parses <
1772 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001773
1774
1775 def prolog(self):
1776 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001777 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001778
1779
1780 def epilog(self):
1781 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001782 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001783
1784
showardd3dc1992009-04-22 21:01:40 +00001785 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001786 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001787 results_dir]
showard97aed502008-11-04 02:01:24 +00001788
1789
1790 def poll(self):
1791 # override poll to keep trying to start until the parse count goes down
1792 # and we can, at which point we revert to default behavior
1793 if self._parse_started:
1794 super(FinalReparseTask, self).poll()
1795 else:
1796 self._try_starting_parse()
1797
1798
1799 def run(self):
1800 # override run() to not actually run unless we can
1801 self._try_starting_parse()
1802
1803
1804 def _try_starting_parse(self):
1805 if not self._can_run_new_parse():
1806 return
showard170873e2009-01-07 00:22:26 +00001807
showard97aed502008-11-04 02:01:24 +00001808 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001809 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001810
showard97aed502008-11-04 02:01:24 +00001811 self._increment_running_parses()
1812 self._parse_started = True
1813
1814
1815 def finished(self, success):
1816 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001817 if self._parse_started:
1818 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001819
1820
showardc9ae1782009-01-30 01:42:37 +00001821class SetEntryPendingTask(AgentTask):
1822 def __init__(self, queue_entry):
1823 super(SetEntryPendingTask, self).__init__(cmd='')
1824 self._queue_entry = queue_entry
1825 self._set_ids(queue_entries=[queue_entry])
1826
1827
1828 def run(self):
1829 agent = self._queue_entry.on_pending()
1830 if agent:
1831 self.agent.dispatcher.add_agent(agent)
1832 self.finished(True)
1833
1834
showarda3c58572009-03-12 20:36:59 +00001835class DBError(Exception):
1836 """Raised by the DBObject constructor when its select fails."""
1837
1838
mbligh36768f02008-02-22 18:28:33 +00001839class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001840 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001841
1842 # Subclasses MUST override these:
1843 _table_name = ''
1844 _fields = ()
1845
showarda3c58572009-03-12 20:36:59 +00001846 # A mapping from (type, id) to the instance of the object for that
1847 # particular id. This prevents us from creating new Job() and Host()
1848 # instances for every HostQueueEntry object that we instantiate as
1849 # multiple HQEs often share the same Job.
1850 _instances_by_type_and_id = weakref.WeakValueDictionary()
1851 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001852
showarda3c58572009-03-12 20:36:59 +00001853
1854 def __new__(cls, id=None, **kwargs):
1855 """
1856 Look to see if we already have an instance for this particular type
1857 and id. If so, use it instead of creating a duplicate instance.
1858 """
1859 if id is not None:
1860 instance = cls._instances_by_type_and_id.get((cls, id))
1861 if instance:
1862 return instance
1863 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1864
1865
1866 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001867 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001868 assert self._table_name, '_table_name must be defined in your class'
1869 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001870 if not new_record:
1871 if self._initialized and not always_query:
1872 return # We've already been initialized.
1873 if id is None:
1874 id = row[0]
1875 # Tell future constructors to use us instead of re-querying while
1876 # this instance is still around.
1877 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001878
showard6ae5ea92009-02-25 00:11:51 +00001879 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001880
jadmanski0afbb632008-06-06 21:10:57 +00001881 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001882
jadmanski0afbb632008-06-06 21:10:57 +00001883 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001884 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001885
showarda3c58572009-03-12 20:36:59 +00001886 if self._initialized:
1887 differences = self._compare_fields_in_row(row)
1888 if differences:
showard7629f142009-03-27 21:02:02 +00001889 logging.warn(
1890 'initialized %s %s instance requery is updating: %s',
1891 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001892 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001893 self._initialized = True
1894
1895
1896 @classmethod
1897 def _clear_instance_cache(cls):
1898 """Used for testing, clear the internal instance cache."""
1899 cls._instances_by_type_and_id.clear()
1900
1901
showardccbd6c52009-03-21 00:10:21 +00001902 def _fetch_row_from_db(self, row_id):
1903 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1904 rows = _db.execute(sql, (row_id,))
1905 if not rows:
showard76e29d12009-04-15 21:53:10 +00001906 raise DBError("row not found (table=%s, row id=%s)"
1907 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001908 return rows[0]
1909
1910
showarda3c58572009-03-12 20:36:59 +00001911 def _assert_row_length(self, row):
1912 assert len(row) == len(self._fields), (
1913 "table = %s, row = %s/%d, fields = %s/%d" % (
1914 self.__table, row, len(row), self._fields, len(self._fields)))
1915
1916
1917 def _compare_fields_in_row(self, row):
1918 """
1919 Given a row as returned by a SELECT query, compare it to our existing
1920 in memory fields.
1921
1922 @param row - A sequence of values corresponding to fields named in
1923 The class attribute _fields.
1924
1925 @returns A dictionary listing the differences keyed by field name
1926 containing tuples of (current_value, row_value).
1927 """
1928 self._assert_row_length(row)
1929 differences = {}
1930 for field, row_value in itertools.izip(self._fields, row):
1931 current_value = getattr(self, field)
1932 if current_value != row_value:
1933 differences[field] = (current_value, row_value)
1934 return differences
showard2bab8f42008-11-12 18:15:22 +00001935
1936
1937 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001938 """
1939 Update our field attributes using a single row returned by SELECT.
1940
1941 @param row - A sequence of values corresponding to fields named in
1942 the class fields list.
1943 """
1944 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001945
showard2bab8f42008-11-12 18:15:22 +00001946 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001947 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001948 setattr(self, field, value)
1949 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001950
showard2bab8f42008-11-12 18:15:22 +00001951 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001952
mblighe2586682008-02-29 22:45:46 +00001953
showardccbd6c52009-03-21 00:10:21 +00001954 def update_from_database(self):
1955 assert self.id is not None
1956 row = self._fetch_row_from_db(self.id)
1957 self._update_fields_from_row(row)
1958
1959
jadmanski0afbb632008-06-06 21:10:57 +00001960 def count(self, where, table = None):
1961 if not table:
1962 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001963
jadmanski0afbb632008-06-06 21:10:57 +00001964 rows = _db.execute("""
1965 SELECT count(*) FROM %s
1966 WHERE %s
1967 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001968
jadmanski0afbb632008-06-06 21:10:57 +00001969 assert len(rows) == 1
1970
1971 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001972
1973
showardd3dc1992009-04-22 21:01:40 +00001974 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00001975 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001976
showard2bab8f42008-11-12 18:15:22 +00001977 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001978 return
mbligh36768f02008-02-22 18:28:33 +00001979
mblighf8c624d2008-07-03 16:58:45 +00001980 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00001981 _db.execute(query, (value, self.id))
1982
showard2bab8f42008-11-12 18:15:22 +00001983 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001984
1985
jadmanski0afbb632008-06-06 21:10:57 +00001986 def save(self):
1987 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001988 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001989 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00001990 values = []
1991 for key in keys:
1992 value = getattr(self, key)
1993 if value is None:
1994 values.append('NULL')
1995 else:
1996 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00001997 values_str = ','.join(values)
1998 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1999 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002000 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002001 # Update our id to the one the database just assigned to us.
2002 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002003
2004
jadmanski0afbb632008-06-06 21:10:57 +00002005 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002006 self._instances_by_type_and_id.pop((type(self), id), None)
2007 self._initialized = False
2008 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002009 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2010 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002011
2012
showard63a34772008-08-18 19:32:50 +00002013 @staticmethod
2014 def _prefix_with(string, prefix):
2015 if string:
2016 string = prefix + string
2017 return string
2018
2019
jadmanski0afbb632008-06-06 21:10:57 +00002020 @classmethod
showard989f25d2008-10-01 11:38:11 +00002021 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002022 """
2023 Construct instances of our class based on the given database query.
2024
2025 @yields One class instance for each row fetched.
2026 """
showard63a34772008-08-18 19:32:50 +00002027 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2028 where = cls._prefix_with(where, 'WHERE ')
2029 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002030 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002031 'joins' : joins,
2032 'where' : where,
2033 'order_by' : order_by})
2034 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002035 for row in rows:
2036 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002037
mbligh36768f02008-02-22 18:28:33 +00002038
2039class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002040 _table_name = 'ineligible_host_queues'
2041 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002042
2043
showard89f84db2009-03-12 20:39:13 +00002044class AtomicGroup(DBObject):
2045 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002046 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2047 'invalid')
showard89f84db2009-03-12 20:39:13 +00002048
2049
showard989f25d2008-10-01 11:38:11 +00002050class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002051 _table_name = 'labels'
2052 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002053 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002054
2055
mbligh36768f02008-02-22 18:28:33 +00002056class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002057 _table_name = 'hosts'
2058 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2059 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2060
2061
jadmanski0afbb632008-06-06 21:10:57 +00002062 def current_task(self):
2063 rows = _db.execute("""
2064 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2065 """, (self.id,))
2066
2067 if len(rows) == 0:
2068 return None
2069 else:
2070 assert len(rows) == 1
2071 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002072 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002073
2074
jadmanski0afbb632008-06-06 21:10:57 +00002075 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002076 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002077 if self.current_task():
2078 self.current_task().requeue()
2079
showard6ae5ea92009-02-25 00:11:51 +00002080
jadmanski0afbb632008-06-06 21:10:57 +00002081 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002082 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002083 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002084
2085
showard170873e2009-01-07 00:22:26 +00002086 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002087 """
showard170873e2009-01-07 00:22:26 +00002088 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002089 """
2090 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002091 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002092 FROM labels
2093 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002094 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002095 ORDER BY labels.name
2096 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002097 platform = None
2098 all_labels = []
2099 for label_name, is_platform in rows:
2100 if is_platform:
2101 platform = label_name
2102 all_labels.append(label_name)
2103 return platform, all_labels
2104
2105
2106 def reverify_tasks(self):
2107 cleanup_task = CleanupTask(host=self)
2108 verify_task = VerifyTask(host=self)
2109 # just to make sure this host does not get taken away
2110 self.set_status('Cleaning')
2111 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002112
2113
mbligh36768f02008-02-22 18:28:33 +00002114class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002115 _table_name = 'host_queue_entries'
2116 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002117 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002118 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002119
2120
showarda3c58572009-03-12 20:36:59 +00002121 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002122 assert id or row
showarda3c58572009-03-12 20:36:59 +00002123 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002124 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002125
jadmanski0afbb632008-06-06 21:10:57 +00002126 if self.host_id:
2127 self.host = Host(self.host_id)
2128 else:
2129 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002130
showard170873e2009-01-07 00:22:26 +00002131 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002132 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002133
2134
showard89f84db2009-03-12 20:39:13 +00002135 @classmethod
2136 def clone(cls, template):
2137 """
2138 Creates a new row using the values from a template instance.
2139
2140 The new instance will not exist in the database or have a valid
2141 id attribute until its save() method is called.
2142 """
2143 assert isinstance(template, cls)
2144 new_row = [getattr(template, field) for field in cls._fields]
2145 clone = cls(row=new_row, new_record=True)
2146 clone.id = None
2147 return clone
2148
2149
showardc85c21b2008-11-24 22:17:37 +00002150 def _view_job_url(self):
2151 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2152
2153
jadmanski0afbb632008-06-06 21:10:57 +00002154 def set_host(self, host):
2155 if host:
2156 self.queue_log_record('Assigning host ' + host.hostname)
2157 self.update_field('host_id', host.id)
2158 self.update_field('active', True)
2159 self.block_host(host.id)
2160 else:
2161 self.queue_log_record('Releasing host')
2162 self.unblock_host(self.host.id)
2163 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002164
jadmanski0afbb632008-06-06 21:10:57 +00002165 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002166
2167
jadmanski0afbb632008-06-06 21:10:57 +00002168 def get_host(self):
2169 return self.host
mbligh36768f02008-02-22 18:28:33 +00002170
2171
jadmanski0afbb632008-06-06 21:10:57 +00002172 def queue_log_record(self, log_line):
2173 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002174 _drone_manager.write_lines_to_file(self.queue_log_path,
2175 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002176
2177
jadmanski0afbb632008-06-06 21:10:57 +00002178 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002179 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002180 row = [0, self.job.id, host_id]
2181 block = IneligibleHostQueue(row=row, new_record=True)
2182 block.save()
mblighe2586682008-02-29 22:45:46 +00002183
2184
jadmanski0afbb632008-06-06 21:10:57 +00002185 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002186 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002187 blocks = IneligibleHostQueue.fetch(
2188 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2189 for block in blocks:
2190 block.delete()
mblighe2586682008-02-29 22:45:46 +00002191
2192
showard2bab8f42008-11-12 18:15:22 +00002193 def set_execution_subdir(self, subdir=None):
2194 if subdir is None:
2195 assert self.get_host()
2196 subdir = self.get_host().hostname
2197 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002198
2199
showard6355f6b2008-12-05 18:52:13 +00002200 def _get_hostname(self):
2201 if self.host:
2202 return self.host.hostname
2203 return 'no host'
2204
2205
showard170873e2009-01-07 00:22:26 +00002206 def __str__(self):
2207 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2208
2209
jadmanski0afbb632008-06-06 21:10:57 +00002210 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002211 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002212
showardb18134f2009-03-20 20:52:18 +00002213 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002214
showardc85c21b2008-11-24 22:17:37 +00002215 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002216 self.update_field('complete', False)
2217 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002218
jadmanski0afbb632008-06-06 21:10:57 +00002219 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002220 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002221 self.update_field('complete', False)
2222 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002223
showardc85c21b2008-11-24 22:17:37 +00002224 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002225 self.update_field('complete', True)
2226 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002227
2228 should_email_status = (status.lower() in _notify_email_statuses or
2229 'all' in _notify_email_statuses)
2230 if should_email_status:
2231 self._email_on_status(status)
2232
2233 self._email_on_job_complete()
2234
2235
2236 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002237 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002238
2239 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2240 self.job.id, self.job.name, hostname, status)
2241 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2242 self.job.id, self.job.name, hostname, status,
2243 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002244 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002245
2246
2247 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002248 if not self.job.is_finished():
2249 return
showard542e8402008-09-19 20:16:18 +00002250
showardc85c21b2008-11-24 22:17:37 +00002251 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002252 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002253 for queue_entry in hosts_queue:
2254 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002255 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002256 queue_entry.status))
2257
2258 summary_text = "\n".join(summary_text)
2259 status_counts = models.Job.objects.get_status_counts(
2260 [self.job.id])[self.job.id]
2261 status = ', '.join('%d %s' % (count, status) for status, count
2262 in status_counts.iteritems())
2263
2264 subject = 'Autotest: Job ID: %s "%s" %s' % (
2265 self.job.id, self.job.name, status)
2266 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2267 self.job.id, self.job.name, status, self._view_job_url(),
2268 summary_text)
showard170873e2009-01-07 00:22:26 +00002269 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002270
2271
showard89f84db2009-03-12 20:39:13 +00002272 def run(self, assigned_host=None):
2273 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002274 assert assigned_host
2275 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002276 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002277
showardb18134f2009-03-20 20:52:18 +00002278 logging.info("%s/%s/%s scheduled on %s, status=%s",
2279 self.job.name, self.meta_host, self.atomic_group_id,
2280 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002281
jadmanski0afbb632008-06-06 21:10:57 +00002282 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002283
showard6ae5ea92009-02-25 00:11:51 +00002284
jadmanski0afbb632008-06-06 21:10:57 +00002285 def requeue(self):
2286 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002287 # verify/cleanup failure sets the execution subdir, so reset it here
2288 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002289 if self.meta_host:
2290 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002291
2292
jadmanski0afbb632008-06-06 21:10:57 +00002293 def handle_host_failure(self):
2294 """\
2295 Called when this queue entry's host has failed verification and
2296 repair.
2297 """
2298 assert not self.meta_host
2299 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002300 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002301
2302
jadmanskif7fa2cc2008-10-01 14:13:23 +00002303 @property
2304 def aborted_by(self):
2305 self._load_abort_info()
2306 return self._aborted_by
2307
2308
2309 @property
2310 def aborted_on(self):
2311 self._load_abort_info()
2312 return self._aborted_on
2313
2314
2315 def _load_abort_info(self):
2316 """ Fetch info about who aborted the job. """
2317 if hasattr(self, "_aborted_by"):
2318 return
2319 rows = _db.execute("""
2320 SELECT users.login, aborted_host_queue_entries.aborted_on
2321 FROM aborted_host_queue_entries
2322 INNER JOIN users
2323 ON users.id = aborted_host_queue_entries.aborted_by_id
2324 WHERE aborted_host_queue_entries.queue_entry_id = %s
2325 """, (self.id,))
2326 if rows:
2327 self._aborted_by, self._aborted_on = rows[0]
2328 else:
2329 self._aborted_by = self._aborted_on = None
2330
2331
showardb2e2c322008-10-14 17:33:55 +00002332 def on_pending(self):
2333 """
2334 Called when an entry in a synchronous job has passed verify. If the
2335 job is ready to run, returns an agent to run the job. Returns None
2336 otherwise.
2337 """
2338 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002339 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002340 if self.job.is_ready():
2341 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002342 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002343 return None
2344
2345
showardd3dc1992009-04-22 21:01:40 +00002346 def abort(self, dispatcher):
2347 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002348
showardd3dc1992009-04-22 21:01:40 +00002349 Status = models.HostQueueEntry.Status
2350 has_running_job_agent = (
2351 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2352 and dispatcher.get_agents_for_entry(self))
2353 if has_running_job_agent:
2354 # do nothing; post-job tasks will finish and then mark this entry
2355 # with status "Aborted" and take care of the host
2356 return
2357
2358 if self.status in (Status.STARTING, Status.PENDING):
2359 self.host.set_status(models.Host.Status.READY)
2360 elif self.status == Status.VERIFYING:
2361 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2362
2363 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002364
2365 def execution_tag(self):
2366 assert self.execution_subdir
2367 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002368
2369
mbligh36768f02008-02-22 18:28:33 +00002370class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002371 _table_name = 'jobs'
2372 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2373 'control_type', 'created_on', 'synch_count', 'timeout',
2374 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2375
2376
showarda3c58572009-03-12 20:36:59 +00002377 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002378 assert id or row
showarda3c58572009-03-12 20:36:59 +00002379 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002380
mblighe2586682008-02-29 22:45:46 +00002381
jadmanski0afbb632008-06-06 21:10:57 +00002382 def is_server_job(self):
2383 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002384
2385
showard170873e2009-01-07 00:22:26 +00002386 def tag(self):
2387 return "%s-%s" % (self.id, self.owner)
2388
2389
jadmanski0afbb632008-06-06 21:10:57 +00002390 def get_host_queue_entries(self):
2391 rows = _db.execute("""
2392 SELECT * FROM host_queue_entries
2393 WHERE job_id= %s
2394 """, (self.id,))
2395 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002396
jadmanski0afbb632008-06-06 21:10:57 +00002397 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002398
jadmanski0afbb632008-06-06 21:10:57 +00002399 return entries
mbligh36768f02008-02-22 18:28:33 +00002400
2401
jadmanski0afbb632008-06-06 21:10:57 +00002402 def set_status(self, status, update_queues=False):
2403 self.update_field('status',status)
2404
2405 if update_queues:
2406 for queue_entry in self.get_host_queue_entries():
2407 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002408
2409
jadmanski0afbb632008-06-06 21:10:57 +00002410 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002411 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2412 status='Pending')
2413 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002414
2415
jadmanski0afbb632008-06-06 21:10:57 +00002416 def num_machines(self, clause = None):
2417 sql = "job_id=%s" % self.id
2418 if clause:
2419 sql += " AND (%s)" % clause
2420 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002421
2422
jadmanski0afbb632008-06-06 21:10:57 +00002423 def num_queued(self):
2424 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002425
2426
jadmanski0afbb632008-06-06 21:10:57 +00002427 def num_active(self):
2428 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002429
2430
jadmanski0afbb632008-06-06 21:10:57 +00002431 def num_complete(self):
2432 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002433
2434
jadmanski0afbb632008-06-06 21:10:57 +00002435 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002436 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002437
mbligh36768f02008-02-22 18:28:33 +00002438
showard6bb7c292009-01-30 01:44:51 +00002439 def _not_yet_run_entries(self, include_verifying=True):
2440 statuses = [models.HostQueueEntry.Status.QUEUED,
2441 models.HostQueueEntry.Status.PENDING]
2442 if include_verifying:
2443 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2444 return models.HostQueueEntry.objects.filter(job=self.id,
2445 status__in=statuses)
2446
2447
2448 def _stop_all_entries(self):
2449 entries_to_stop = self._not_yet_run_entries(
2450 include_verifying=False)
2451 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002452 assert not child_entry.complete, (
2453 '%s status=%s, active=%s, complete=%s' %
2454 (child_entry.id, child_entry.status, child_entry.active,
2455 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002456 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2457 child_entry.host.status = models.Host.Status.READY
2458 child_entry.host.save()
2459 child_entry.status = models.HostQueueEntry.Status.STOPPED
2460 child_entry.save()
2461
showard2bab8f42008-11-12 18:15:22 +00002462 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002463 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002464 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002465 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002466
2467
jadmanski0afbb632008-06-06 21:10:57 +00002468 def write_to_machines_file(self, queue_entry):
2469 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002470 file_path = os.path.join(self.tag(), '.machines')
2471 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002472
2473
showard2bab8f42008-11-12 18:15:22 +00002474 def _next_group_name(self):
2475 query = models.HostQueueEntry.objects.filter(
2476 job=self.id).values('execution_subdir').distinct()
2477 subdirs = (entry['execution_subdir'] for entry in query)
2478 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2479 ids = [int(match.group(1)) for match in groups if match]
2480 if ids:
2481 next_id = max(ids) + 1
2482 else:
2483 next_id = 0
2484 return "group%d" % next_id
2485
2486
showard170873e2009-01-07 00:22:26 +00002487 def _write_control_file(self, execution_tag):
2488 control_path = _drone_manager.attach_file_to_execution(
2489 execution_tag, self.control_file)
2490 return control_path
mbligh36768f02008-02-22 18:28:33 +00002491
showardb2e2c322008-10-14 17:33:55 +00002492
showard2bab8f42008-11-12 18:15:22 +00002493 def get_group_entries(self, queue_entry_from_group):
2494 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002495 return list(HostQueueEntry.fetch(
2496 where='job_id=%s AND execution_subdir=%s',
2497 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002498
2499
showardb2e2c322008-10-14 17:33:55 +00002500 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002501 assert queue_entries
2502 execution_tag = queue_entries[0].execution_tag()
2503 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002504 hostnames = ','.join([entry.get_host().hostname
2505 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002506
showard87ba02a2009-04-20 19:37:32 +00002507 params = _autoserv_command_line(
2508 hostnames, execution_tag,
2509 ['-P', execution_tag, '-n',
2510 _drone_manager.absolute_path(control_path)],
2511 job=self)
mbligh36768f02008-02-22 18:28:33 +00002512
jadmanski0afbb632008-06-06 21:10:57 +00002513 if not self.is_server_job():
2514 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002515
showardb2e2c322008-10-14 17:33:55 +00002516 return params
mblighe2586682008-02-29 22:45:46 +00002517
mbligh36768f02008-02-22 18:28:33 +00002518
showardc9ae1782009-01-30 01:42:37 +00002519 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002520 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002521 return True
showard0fc38302008-10-23 00:44:07 +00002522 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002523 return queue_entry.get_host().dirty
2524 return False
showard21baa452008-10-21 00:08:39 +00002525
showardc9ae1782009-01-30 01:42:37 +00002526
2527 def _should_run_verify(self, queue_entry):
2528 do_not_verify = (queue_entry.host.protection ==
2529 host_protections.Protection.DO_NOT_VERIFY)
2530 if do_not_verify:
2531 return False
2532 return self.run_verify
2533
2534
2535 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002536 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002537 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002538 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002539 if self._should_run_verify(queue_entry):
2540 tasks.append(VerifyTask(queue_entry=queue_entry))
2541 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002542 return tasks
2543
2544
showard2bab8f42008-11-12 18:15:22 +00002545 def _assign_new_group(self, queue_entries):
2546 if len(queue_entries) == 1:
2547 group_name = queue_entries[0].get_host().hostname
2548 else:
2549 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002550 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002551 self.id, [entry.host.hostname for entry in queue_entries],
2552 group_name)
2553
2554 for queue_entry in queue_entries:
2555 queue_entry.set_execution_subdir(group_name)
2556
2557
2558 def _choose_group_to_run(self, include_queue_entry):
2559 chosen_entries = [include_queue_entry]
2560
2561 num_entries_needed = self.synch_count - 1
2562 if num_entries_needed > 0:
2563 pending_entries = HostQueueEntry.fetch(
2564 where='job_id = %s AND status = "Pending" AND id != %s',
2565 params=(self.id, include_queue_entry.id))
2566 chosen_entries += list(pending_entries)[:num_entries_needed]
2567
2568 self._assign_new_group(chosen_entries)
2569 return chosen_entries
2570
2571
2572 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002573 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002574 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2575 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002576
showard2bab8f42008-11-12 18:15:22 +00002577 queue_entries = self._choose_group_to_run(queue_entry)
2578 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002579
2580
2581 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002582 for queue_entry in queue_entries:
2583 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002584 params = self._get_autoserv_params(queue_entries)
2585 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2586 cmd=params)
2587 tasks = initial_tasks + [queue_task]
2588 entry_ids = [entry.id for entry in queue_entries]
2589
showard170873e2009-01-07 00:22:26 +00002590 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002591
2592
mbligh36768f02008-02-22 18:28:33 +00002593if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002594 main()