blob: 93718fe827863d7df5a42f196a21e14a42259d36 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
78def main():
showard27f33872009-04-07 18:20:53 +000079 try:
80 main_without_exception_handling()
81 except:
82 logging.exception('Exception escaping in monitor_db')
83 raise
84
85
86def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000087 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000088
jadmanski0afbb632008-06-06 21:10:57 +000089 parser = optparse.OptionParser(usage)
90 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
91 action='store_true')
92 parser.add_option('--logfile', help='Set a log file that all stdout ' +
93 'should be redirected to. Stderr will go to this ' +
94 'file + ".err"')
95 parser.add_option('--test', help='Indicate that scheduler is under ' +
96 'test and should use dummy autoserv and no parsing',
97 action='store_true')
98 (options, args) = parser.parse_args()
99 if len(args) != 1:
100 parser.print_usage()
101 return
mbligh36768f02008-02-22 18:28:33 +0000102
jadmanski0afbb632008-06-06 21:10:57 +0000103 global RESULTS_DIR
104 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000105
showardcca334f2009-03-12 20:38:34 +0000106 # Change the cwd while running to avoid issues incase we were launched from
107 # somewhere odd (such as a random NFS home directory of the person running
108 # sudo to launch us as the appropriate user).
109 os.chdir(RESULTS_DIR)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000112 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
113 "notify_email_statuses",
114 default='')
showardc85c21b2008-11-24 22:17:37 +0000115 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000116 _notify_email_statuses = [status for status in
117 re.split(r'[\s,;:]', notify_statuses_list.lower())
118 if status]
showardc85c21b2008-11-24 22:17:37 +0000119
jadmanski0afbb632008-06-06 21:10:57 +0000120 if options.test:
121 global _autoserv_path
122 _autoserv_path = 'autoserv_dummy'
123 global _testing_mode
124 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000125
mbligh37eceaa2008-12-15 22:56:37 +0000126 # AUTOTEST_WEB.base_url is still a supported config option as some people
127 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000128 global _base_url
showard170873e2009-01-07 00:22:26 +0000129 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
130 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000131 if config_base_url:
132 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000133 else:
mbligh37eceaa2008-12-15 22:56:37 +0000134 # For the common case of everything running on a single server you
135 # can just set the hostname in a single place in the config file.
136 server_name = c.get_config_value('SERVER', 'hostname')
137 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000138 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000139 sys.exit(1)
140 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000141
showardc5afc462009-01-13 00:09:39 +0000142 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000143 server.start()
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 try:
showardc5afc462009-01-13 00:09:39 +0000146 init(options.logfile)
147 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000148 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000149
jadmanski0afbb632008-06-06 21:10:57 +0000150 while not _shutdown:
151 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000152 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000153 except:
showard170873e2009-01-07 00:22:26 +0000154 email_manager.manager.log_stacktrace(
155 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000156
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000158 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000159 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000160 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
169def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000170 if logfile:
171 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
173 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000174
mblighfb676032009-04-01 18:25:38 +0000175 utils.write_pid("monitor_db")
176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000184 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
188
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000190 signal.signal(signal.SIGINT, handle_sigint)
191
showardd1ee1dd2009-01-07 21:33:08 +0000192 drones = global_config.global_config.get_config_value(
193 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
194 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000195 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000196 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000197 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
198
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000200
201
202def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000203 out_file = logfile
204 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000206 out_fd = open(out_file, "a", buffering=0)
207 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.dup2(out_fd.fileno(), sys.stdout.fileno())
210 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000211
jadmanski0afbb632008-06-06 21:10:57 +0000212 sys.stdout = out_fd
213 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000214
215
showard87ba02a2009-04-20 19:37:32 +0000216def _autoserv_command_line(machines, results_dir, extra_args, job=None,
217 queue_entry=None):
218 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
219 '-r', _drone_manager.absolute_path(results_dir)]
220 if job or queue_entry:
221 if not job:
222 job = queue_entry.job
223 autoserv_argv += ['-u', job.owner, '-l', job.name]
224 return autoserv_argv + extra_args
225
226
showard89f84db2009-03-12 20:39:13 +0000227class SchedulerError(Exception):
228 """Raised by HostScheduler when an inconsistent state occurs."""
229
230
showard63a34772008-08-18 19:32:50 +0000231class HostScheduler(object):
232 def _get_ready_hosts(self):
233 # avoid any host with a currently active queue entry against it
234 hosts = Host.fetch(
235 joins='LEFT JOIN host_queue_entries AS active_hqe '
236 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000237 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000238 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000239 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000240 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
241 return dict((host.id, host) for host in hosts)
242
243
244 @staticmethod
245 def _get_sql_id_list(id_list):
246 return ','.join(str(item_id) for item_id in id_list)
247
248
249 @classmethod
showard989f25d2008-10-01 11:38:11 +0000250 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000251 if not id_list:
252 return {}
showard63a34772008-08-18 19:32:50 +0000253 query %= cls._get_sql_id_list(id_list)
254 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000255 return cls._process_many2many_dict(rows, flip)
256
257
258 @staticmethod
259 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000260 result = {}
261 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000262 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000263 if flip:
264 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000265 result.setdefault(left_id, set()).add(right_id)
266 return result
267
268
269 @classmethod
270 def _get_job_acl_groups(cls, job_ids):
271 query = """
showardd9ac4452009-02-07 02:04:37 +0000272 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000273 FROM jobs
274 INNER JOIN users ON users.login = jobs.owner
275 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
276 WHERE jobs.id IN (%s)
277 """
278 return cls._get_many2many_dict(query, job_ids)
279
280
281 @classmethod
282 def _get_job_ineligible_hosts(cls, job_ids):
283 query = """
284 SELECT job_id, host_id
285 FROM ineligible_host_queues
286 WHERE job_id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_job_dependencies(cls, job_ids):
293 query = """
294 SELECT job_id, label_id
295 FROM jobs_dependency_labels
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard63a34772008-08-18 19:32:50 +0000302 def _get_host_acls(cls, host_ids):
303 query = """
showardd9ac4452009-02-07 02:04:37 +0000304 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000305 FROM acl_groups_hosts
306 WHERE host_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, host_ids)
309
310
311 @classmethod
312 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000313 if not host_ids:
314 return {}, {}
showard63a34772008-08-18 19:32:50 +0000315 query = """
316 SELECT label_id, host_id
317 FROM hosts_labels
318 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000319 """ % cls._get_sql_id_list(host_ids)
320 rows = _db.execute(query)
321 labels_to_hosts = cls._process_many2many_dict(rows)
322 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
323 return labels_to_hosts, hosts_to_labels
324
325
326 @classmethod
327 def _get_labels(cls):
328 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000329
330
331 def refresh(self, pending_queue_entries):
332 self._hosts_available = self._get_ready_hosts()
333
334 relevant_jobs = [queue_entry.job_id
335 for queue_entry in pending_queue_entries]
336 self._job_acls = self._get_job_acl_groups(relevant_jobs)
337 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000338 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000339
340 host_ids = self._hosts_available.keys()
341 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000342 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
343
344 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000345
346
347 def _is_acl_accessible(self, host_id, queue_entry):
348 job_acls = self._job_acls.get(queue_entry.job_id, set())
349 host_acls = self._host_acls.get(host_id, set())
350 return len(host_acls.intersection(job_acls)) > 0
351
352
showard989f25d2008-10-01 11:38:11 +0000353 def _check_job_dependencies(self, job_dependencies, host_labels):
354 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000355 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000356
357
358 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
359 queue_entry):
showardade14e22009-01-26 22:38:32 +0000360 if not queue_entry.meta_host:
361 # bypass only_if_needed labels when a specific host is selected
362 return True
363
showard989f25d2008-10-01 11:38:11 +0000364 for label_id in host_labels:
365 label = self._labels[label_id]
366 if not label.only_if_needed:
367 # we don't care about non-only_if_needed labels
368 continue
369 if queue_entry.meta_host == label_id:
370 # if the label was requested in a metahost it's OK
371 continue
372 if label_id not in job_dependencies:
373 return False
374 return True
375
376
showard89f84db2009-03-12 20:39:13 +0000377 def _check_atomic_group_labels(self, host_labels, queue_entry):
378 """
379 Determine if the given HostQueueEntry's atomic group settings are okay
380 to schedule on a host with the given labels.
381
382 @param host_labels - A list of label ids that the host has.
383 @param queue_entry - The HostQueueEntry being considered for the host.
384
385 @returns True if atomic group settings are okay, False otherwise.
386 """
387 return (self._get_host_atomic_group_id(host_labels) ==
388 queue_entry.atomic_group_id)
389
390
391 def _get_host_atomic_group_id(self, host_labels):
392 """
393 Return the atomic group label id for a host with the given set of
394 labels if any, or None otherwise. Raises an exception if more than
395 one atomic group are found in the set of labels.
396
397 @param host_labels - A list of label ids that the host has.
398
399 @returns The id of the atomic group found on a label in host_labels
400 or None if no atomic group label is found.
401 @raises SchedulerError - If more than one atomic group label is found.
402 """
403 atomic_ids = [self._labels[label_id].atomic_group_id
404 for label_id in host_labels
405 if self._labels[label_id].atomic_group_id is not None]
406 if not atomic_ids:
407 return None
408 if len(atomic_ids) > 1:
409 raise SchedulerError('More than one atomic label on host.')
410 return atomic_ids[0]
411
412
413 def _get_atomic_group_labels(self, atomic_group_id):
414 """
415 Lookup the label ids that an atomic_group is associated with.
416
417 @param atomic_group_id - The id of the AtomicGroup to look up.
418
419 @returns A generator yeilding Label ids for this atomic group.
420 """
421 return (id for id, label in self._labels.iteritems()
422 if label.atomic_group_id == atomic_group_id
423 and not label.invalid)
424
425
426 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
427 """
428 @param group_hosts - A sequence of Host ids to test for usability
429 and eligibility against the Job associated with queue_entry.
430 @param queue_entry - The HostQueueEntry that these hosts are being
431 tested for eligibility against.
432
433 @returns A subset of group_hosts Host ids that are eligible for the
434 supplied queue_entry.
435 """
436 return set(host_id for host_id in group_hosts
437 if self._is_host_usable(host_id)
438 and self._is_host_eligible_for_job(host_id, queue_entry))
439
440
showard989f25d2008-10-01 11:38:11 +0000441 def _is_host_eligible_for_job(self, host_id, queue_entry):
442 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
443 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000444
showard89f84db2009-03-12 20:39:13 +0000445 return (self._is_acl_accessible(host_id, queue_entry) and
446 self._check_job_dependencies(job_dependencies, host_labels) and
447 self._check_only_if_needed_labels(
448 job_dependencies, host_labels, queue_entry) and
449 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000450
451
showard63a34772008-08-18 19:32:50 +0000452 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000453 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000454 return None
455 return self._hosts_available.pop(queue_entry.host_id, None)
456
457
458 def _is_host_usable(self, host_id):
459 if host_id not in self._hosts_available:
460 # host was already used during this scheduling cycle
461 return False
462 if self._hosts_available[host_id].invalid:
463 # Invalid hosts cannot be used for metahosts. They're included in
464 # the original query because they can be used by non-metahosts.
465 return False
466 return True
467
468
469 def _schedule_metahost(self, queue_entry):
470 label_id = queue_entry.meta_host
471 hosts_in_label = self._label_hosts.get(label_id, set())
472 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
473 set())
474
475 # must iterate over a copy so we can mutate the original while iterating
476 for host_id in list(hosts_in_label):
477 if not self._is_host_usable(host_id):
478 hosts_in_label.remove(host_id)
479 continue
480 if host_id in ineligible_host_ids:
481 continue
showard989f25d2008-10-01 11:38:11 +0000482 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000483 continue
484
showard89f84db2009-03-12 20:39:13 +0000485 # Remove the host from our cached internal state before returning
486 # the host object.
showard63a34772008-08-18 19:32:50 +0000487 hosts_in_label.remove(host_id)
488 return self._hosts_available.pop(host_id)
489 return None
490
491
492 def find_eligible_host(self, queue_entry):
493 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000494 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000495 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_metahost(queue_entry)
498
499
showard89f84db2009-03-12 20:39:13 +0000500 def find_eligible_atomic_group(self, queue_entry):
501 """
502 Given an atomic group host queue entry, locate an appropriate group
503 of hosts for the associated job to run on.
504
505 The caller is responsible for creating new HQEs for the additional
506 hosts returned in order to run the actual job on them.
507
508 @returns A list of Host instances in a ready state to satisfy this
509 atomic group scheduling. Hosts will all belong to the same
510 atomic group label as specified by the queue_entry.
511 An empty list will be returned if no suitable atomic
512 group could be found.
513
514 TODO(gps): what is responsible for kicking off any attempted repairs on
515 a group of hosts? not this function, but something needs to. We do
516 not communicate that reason for returning [] outside of here...
517 For now, we'll just be unschedulable if enough hosts within one group
518 enter Repair Failed state.
519 """
520 assert queue_entry.atomic_group_id is not None
521 job = queue_entry.job
522 assert job.synch_count and job.synch_count > 0
523 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
524 if job.synch_count > atomic_group.max_number_of_machines:
525 # Such a Job and HostQueueEntry should never be possible to
526 # create using the frontend. Regardless, we can't process it.
527 # Abort it immediately and log an error on the scheduler.
528 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000529 logging.error(
530 'Error: job %d synch_count=%d > requested atomic_group %d '
531 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
532 job.id, job.synch_count, atomic_group.id,
533 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000534 return []
535 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
536 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
537 set())
538
539 # Look in each label associated with atomic_group until we find one with
540 # enough hosts to satisfy the job.
541 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
542 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
543 if queue_entry.meta_host is not None:
544 # If we have a metahost label, only allow its hosts.
545 group_hosts.intersection_update(hosts_in_label)
546 group_hosts -= ineligible_host_ids
547 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
548 group_hosts, queue_entry)
549
550 # Job.synch_count is treated as "minimum synch count" when
551 # scheduling for an atomic group of hosts. The atomic group
552 # number of machines is the maximum to pick out of a single
553 # atomic group label for scheduling at one time.
554 min_hosts = job.synch_count
555 max_hosts = atomic_group.max_number_of_machines
556
557 if len(eligible_hosts_in_group) < min_hosts:
558 # Not enough eligible hosts in this atomic group label.
559 continue
560
561 # Limit ourselves to scheduling the atomic group size.
562 if len(eligible_hosts_in_group) > max_hosts:
563 eligible_hosts_in_group = random.sample(
564 eligible_hosts_in_group, max_hosts)
565
566 # Remove the selected hosts from our cached internal state
567 # of available hosts in order to return the Host objects.
568 host_list = []
569 for host_id in eligible_hosts_in_group:
570 hosts_in_label.discard(host_id)
571 host_list.append(self._hosts_available.pop(host_id))
572 return host_list
573
574 return []
575
576
showard170873e2009-01-07 00:22:26 +0000577class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000578 def __init__(self):
579 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000580 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000581 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000582 user_cleanup_time = scheduler_config.config.clean_interval
583 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
584 _db, user_cleanup_time)
585 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000586 self._host_agents = {}
587 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000588
mbligh36768f02008-02-22 18:28:33 +0000589
showard915958d2009-04-22 21:00:58 +0000590 def initialize(self, recover_hosts=True):
591 self._periodic_cleanup.initialize()
592 self._24hr_upkeep.initialize()
593
jadmanski0afbb632008-06-06 21:10:57 +0000594 # always recover processes
595 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000596
jadmanski0afbb632008-06-06 21:10:57 +0000597 if recover_hosts:
598 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000599
600
jadmanski0afbb632008-06-06 21:10:57 +0000601 def tick(self):
showard170873e2009-01-07 00:22:26 +0000602 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000603 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000604 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000605 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000606 self._schedule_new_jobs()
607 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000608 _drone_manager.execute_actions()
609 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000610
showard97aed502008-11-04 02:01:24 +0000611
mblighf3294cc2009-04-08 21:17:38 +0000612 def _run_cleanup(self):
613 self._periodic_cleanup.run_cleanup_maybe()
614 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000615
mbligh36768f02008-02-22 18:28:33 +0000616
showard170873e2009-01-07 00:22:26 +0000617 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
618 for object_id in object_ids:
619 agent_dict.setdefault(object_id, set()).add(agent)
620
621
622 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
623 for object_id in object_ids:
624 assert object_id in agent_dict
625 agent_dict[object_id].remove(agent)
626
627
jadmanski0afbb632008-06-06 21:10:57 +0000628 def add_agent(self, agent):
629 self._agents.append(agent)
630 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000631 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
632 self._register_agent_for_ids(self._queue_entry_agents,
633 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000634
showard170873e2009-01-07 00:22:26 +0000635
636 def get_agents_for_entry(self, queue_entry):
637 """
638 Find agents corresponding to the specified queue_entry.
639 """
showardd3dc1992009-04-22 21:01:40 +0000640 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000641
642
643 def host_has_agent(self, host):
644 """
645 Determine if there is currently an Agent present using this host.
646 """
647 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000648
649
jadmanski0afbb632008-06-06 21:10:57 +0000650 def remove_agent(self, agent):
651 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000652 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
653 agent)
654 self._unregister_agent_for_ids(self._queue_entry_agents,
655 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000656
657
jadmanski0afbb632008-06-06 21:10:57 +0000658 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000659 self._register_pidfiles()
660 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000661 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000662 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000663 self._reverify_remaining_hosts()
664 # reinitialize drones after killing orphaned processes, since they can
665 # leave around files when they die
666 _drone_manager.execute_actions()
667 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000668
showard170873e2009-01-07 00:22:26 +0000669
670 def _register_pidfiles(self):
671 # during recovery we may need to read pidfiles for both running and
672 # parsing entries
673 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000674 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000675 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000676 for pidfile_name in _ALL_PIDFILE_NAMES:
677 pidfile_id = _drone_manager.get_pidfile_id_from(
678 queue_entry.execution_tag(), pidfile_name=pidfile_name)
679 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000680
681
showardd3dc1992009-04-22 21:01:40 +0000682 def _recover_entries_with_status(self, status, orphans, pidfile_name,
683 recover_entries_fn):
684 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000685 for queue_entry in queue_entries:
686 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000687 # synchronous job we've already recovered
688 continue
showardd3dc1992009-04-22 21:01:40 +0000689 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000690 execution_tag = queue_entry.execution_tag()
691 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000692 run_monitor.attach_to_existing_process(execution_tag,
693 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000694 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000695 # execution apparently never happened
696 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000697 continue
mbligh90a549d2008-03-25 23:52:34 +0000698
showardd3dc1992009-04-22 21:01:40 +0000699 logging.info('Recovering %s entry %s (process %s)',
700 status.lower(),
701 ', '.join(str(entry) for entry in queue_entries),
702 run_monitor.get_process())
703 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
704 orphans.discard(run_monitor.get_process())
705
706
707 def _kill_remaining_orphan_processes(self, orphans):
708 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000709 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000710 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000711
showard170873e2009-01-07 00:22:26 +0000712
showardd3dc1992009-04-22 21:01:40 +0000713 def _recover_running_entries(self, orphans):
714 def recover_entries(job, queue_entries, run_monitor):
715 if run_monitor is not None:
716 queue_task = RecoveryQueueTask(job=job,
717 queue_entries=queue_entries,
718 run_monitor=run_monitor)
719 self.add_agent(Agent(tasks=[queue_task],
720 num_processes=len(queue_entries)))
721 # else, _requeue_other_active_entries will cover this
722
723 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
724 orphans, '.autoserv_execute',
725 recover_entries)
726
727
728 def _recover_gathering_entries(self, orphans):
729 def recover_entries(job, queue_entries, run_monitor):
730 gather_task = GatherLogsTask(job, queue_entries,
731 run_monitor=run_monitor)
732 self.add_agent(Agent([gather_task]))
733
734 self._recover_entries_with_status(
735 models.HostQueueEntry.Status.GATHERING,
736 orphans, _CRASHINFO_PID_FILE, recover_entries)
737
738
739 def _recover_parsing_entries(self, orphans):
740 def recover_entries(job, queue_entries, run_monitor):
741 reparse_task = FinalReparseTask(queue_entries,
742 run_monitor=run_monitor)
743 self.add_agent(Agent([reparse_task], num_processes=0))
744
745 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
746 orphans, _PARSER_PID_FILE,
747 recover_entries)
748
749
750 def _recover_all_recoverable_entries(self):
751 orphans = _drone_manager.get_orphaned_autoserv_processes()
752 self._recover_running_entries(orphans)
753 self._recover_gathering_entries(orphans)
754 self._recover_parsing_entries(orphans)
755 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000756
showard97aed502008-11-04 02:01:24 +0000757
showard170873e2009-01-07 00:22:26 +0000758 def _requeue_other_active_entries(self):
759 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000760 where='active AND NOT complete AND '
761 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000762 for queue_entry in queue_entries:
763 if self.get_agents_for_entry(queue_entry):
764 # entry has already been recovered
765 continue
showardd3dc1992009-04-22 21:01:40 +0000766 if queue_entry.aborted:
767 queue_entry.abort(self)
768 continue
769
770 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000771 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000772 if queue_entry.host:
773 tasks = queue_entry.host.reverify_tasks()
774 self.add_agent(Agent(tasks))
775 agent = queue_entry.requeue()
776
777
778 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000779 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000780 self._reverify_hosts_where("""(status = 'Repairing' OR
781 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000782 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000783
showard170873e2009-01-07 00:22:26 +0000784 # recover "Running" hosts with no active queue entries, although this
785 # should never happen
786 message = ('Recovering running host %s - this probably indicates a '
787 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000788 self._reverify_hosts_where("""status = 'Running' AND
789 id NOT IN (SELECT host_id
790 FROM host_queue_entries
791 WHERE active)""",
792 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000793
794
jadmanski0afbb632008-06-06 21:10:57 +0000795 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000796 print_message='Reverifying host %s'):
797 full_where='locked = 0 AND invalid = 0 AND ' + where
798 for host in Host.fetch(where=full_where):
799 if self.host_has_agent(host):
800 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000801 continue
showard170873e2009-01-07 00:22:26 +0000802 if print_message:
showardb18134f2009-03-20 20:52:18 +0000803 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000804 tasks = host.reverify_tasks()
805 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000806
807
jadmanski0afbb632008-06-06 21:10:57 +0000808 def _recover_hosts(self):
809 # recover "Repair Failed" hosts
810 message = 'Reverifying dead host %s'
811 self._reverify_hosts_where("status = 'Repair Failed'",
812 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000813
814
showard04c82c52008-05-29 19:38:12 +0000815
showardb95b1bd2008-08-15 18:11:04 +0000816 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000817 # prioritize by job priority, then non-metahost over metahost, then FIFO
818 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000819 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000820 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000821 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000822
823
showard89f84db2009-03-12 20:39:13 +0000824 def _refresh_pending_queue_entries(self):
825 """
826 Lookup the pending HostQueueEntries and call our HostScheduler
827 refresh() method given that list. Return the list.
828
829 @returns A list of pending HostQueueEntries sorted in priority order.
830 """
showard63a34772008-08-18 19:32:50 +0000831 queue_entries = self._get_pending_queue_entries()
832 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000833 return []
showardb95b1bd2008-08-15 18:11:04 +0000834
showard63a34772008-08-18 19:32:50 +0000835 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000836
showard89f84db2009-03-12 20:39:13 +0000837 return queue_entries
838
839
840 def _schedule_atomic_group(self, queue_entry):
841 """
842 Schedule the given queue_entry on an atomic group of hosts.
843
844 Returns immediately if there are insufficient available hosts.
845
846 Creates new HostQueueEntries based off of queue_entry for the
847 scheduled hosts and starts them all running.
848 """
849 # This is a virtual host queue entry representing an entire
850 # atomic group, find a group and schedule their hosts.
851 group_hosts = self._host_scheduler.find_eligible_atomic_group(
852 queue_entry)
853 if not group_hosts:
854 return
855 # The first assigned host uses the original HostQueueEntry
856 group_queue_entries = [queue_entry]
857 for assigned_host in group_hosts[1:]:
858 # Create a new HQE for every additional assigned_host.
859 new_hqe = HostQueueEntry.clone(queue_entry)
860 new_hqe.save()
861 group_queue_entries.append(new_hqe)
862 assert len(group_queue_entries) == len(group_hosts)
863 for queue_entry, host in itertools.izip(group_queue_entries,
864 group_hosts):
865 self._run_queue_entry(queue_entry, host)
866
867
868 def _schedule_new_jobs(self):
869 queue_entries = self._refresh_pending_queue_entries()
870 if not queue_entries:
871 return
872
showard63a34772008-08-18 19:32:50 +0000873 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000874 if (queue_entry.atomic_group_id is None or
875 queue_entry.host_id is not None):
876 assigned_host = self._host_scheduler.find_eligible_host(
877 queue_entry)
878 if assigned_host:
879 self._run_queue_entry(queue_entry, assigned_host)
880 else:
881 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000882
883
884 def _run_queue_entry(self, queue_entry, host):
885 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000886 # in some cases (synchronous jobs with run_verify=False), agent may be
887 # None
showard9976ce92008-10-15 20:28:13 +0000888 if agent:
889 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000890
891
jadmanski0afbb632008-06-06 21:10:57 +0000892 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000893 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
894 for agent in self.get_agents_for_entry(entry):
895 agent.abort()
896 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000897
898
showard324bf812009-01-20 23:23:38 +0000899 def _can_start_agent(self, agent, num_started_this_cycle,
900 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000901 # always allow zero-process agents to run
902 if agent.num_processes == 0:
903 return True
904 # don't allow any nonzero-process agents to run after we've reached a
905 # limit (this avoids starvation of many-process agents)
906 if have_reached_limit:
907 return False
908 # total process throttling
showard324bf812009-01-20 23:23:38 +0000909 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000910 return False
911 # if a single agent exceeds the per-cycle throttling, still allow it to
912 # run when it's the first agent in the cycle
913 if num_started_this_cycle == 0:
914 return True
915 # per-cycle throttling
916 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000917 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000918 return False
919 return True
920
921
jadmanski0afbb632008-06-06 21:10:57 +0000922 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000923 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000924 have_reached_limit = False
925 # iterate over copy, so we can remove agents during iteration
926 for agent in list(self._agents):
927 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000928 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000929 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000930 continue
931 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000932 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000933 have_reached_limit):
934 have_reached_limit = True
935 continue
showard4c5374f2008-09-04 17:02:56 +0000936 num_started_this_cycle += agent.num_processes
937 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000938 logging.info('%d running processes',
939 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000940
941
showard29f7cd22009-04-29 21:16:24 +0000942 def _process_recurring_runs(self):
943 recurring_runs = models.RecurringRun.objects.filter(
944 start_date__lte=datetime.datetime.now())
945 for rrun in recurring_runs:
946 # Create job from template
947 job = rrun.job
948 info = rpc_utils.get_job_info(job)
949
950 host_objects = info['hosts']
951 one_time_hosts = info['one_time_hosts']
952 metahost_objects = info['meta_hosts']
953 dependencies = info['dependencies']
954 atomic_group = info['atomic_group']
955
956 for host in one_time_hosts or []:
957 this_host = models.Host.create_one_time_host(host.hostname)
958 host_objects.append(this_host)
959
960 try:
961 rpc_utils.create_new_job(owner=rrun.owner.login,
962 host_objects=host_objects,
963 metahost_objects=metahost_objects,
964 name=job.name,
965 priority=job.priority,
966 control_file=job.control_file,
967 control_type=job.control_type,
968 is_template=False,
969 synch_count=job.synch_count,
970 timeout=job.timeout,
971 run_verify=job.run_verify,
972 email_list=job.email_list,
973 dependencies=dependencies,
974 reboot_before=job.reboot_before,
975 reboot_after=job.reboot_after,
976 atomic_group=atomic_group)
977
978 except Exception, ex:
979 logging.exception(ex)
980 #TODO send email
981
982 if rrun.loop_count == 1:
983 rrun.delete()
984 else:
985 if rrun.loop_count != 0: # if not infinite loop
986 # calculate new start_date
987 difference = datetime.timedelta(seconds=rrun.loop_period)
988 rrun.start_date = rrun.start_date + difference
989 rrun.loop_count -= 1
990 rrun.save()
991
992
showard170873e2009-01-07 00:22:26 +0000993class PidfileRunMonitor(object):
994 """
995 Client must call either run() to start a new process or
996 attach_to_existing_process().
997 """
mbligh36768f02008-02-22 18:28:33 +0000998
showard170873e2009-01-07 00:22:26 +0000999 class _PidfileException(Exception):
1000 """
1001 Raised when there's some unexpected behavior with the pid file, but only
1002 used internally (never allowed to escape this class).
1003 """
mbligh36768f02008-02-22 18:28:33 +00001004
1005
showard170873e2009-01-07 00:22:26 +00001006 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001007 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001008 self._start_time = None
1009 self.pidfile_id = None
1010 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001011
1012
showard170873e2009-01-07 00:22:26 +00001013 def _add_nice_command(self, command, nice_level):
1014 if not nice_level:
1015 return command
1016 return ['nice', '-n', str(nice_level)] + command
1017
1018
1019 def _set_start_time(self):
1020 self._start_time = time.time()
1021
1022
1023 def run(self, command, working_directory, nice_level=None, log_file=None,
1024 pidfile_name=None, paired_with_pidfile=None):
1025 assert command is not None
1026 if nice_level is not None:
1027 command = ['nice', '-n', str(nice_level)] + command
1028 self._set_start_time()
1029 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001030 command, working_directory, pidfile_name=pidfile_name,
1031 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001032
1033
showardd3dc1992009-04-22 21:01:40 +00001034 def attach_to_existing_process(self, execution_tag,
1035 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001036 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001037 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1038 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001039 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001040
1041
jadmanski0afbb632008-06-06 21:10:57 +00001042 def kill(self):
showard170873e2009-01-07 00:22:26 +00001043 if self.has_process():
1044 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001045
mbligh36768f02008-02-22 18:28:33 +00001046
showard170873e2009-01-07 00:22:26 +00001047 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001048 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001049 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001050
1051
showard170873e2009-01-07 00:22:26 +00001052 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001053 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001054 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001055 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001056
1057
showard170873e2009-01-07 00:22:26 +00001058 def _read_pidfile(self, use_second_read=False):
1059 assert self.pidfile_id is not None, (
1060 'You must call run() or attach_to_existing_process()')
1061 contents = _drone_manager.get_pidfile_contents(
1062 self.pidfile_id, use_second_read=use_second_read)
1063 if contents.is_invalid():
1064 self._state = drone_manager.PidfileContents()
1065 raise self._PidfileException(contents)
1066 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001067
1068
showard21baa452008-10-21 00:08:39 +00001069 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001070 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1071 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001072 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001073 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001074 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001075
1076
1077 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001078 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001079 return
mblighbb421852008-03-11 22:36:16 +00001080
showard21baa452008-10-21 00:08:39 +00001081 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001082
showard170873e2009-01-07 00:22:26 +00001083 if self._state.process is None:
1084 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001085 return
mbligh90a549d2008-03-25 23:52:34 +00001086
showard21baa452008-10-21 00:08:39 +00001087 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001088 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001089 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001090 return
mbligh90a549d2008-03-25 23:52:34 +00001091
showard170873e2009-01-07 00:22:26 +00001092 # pid but no running process - maybe process *just* exited
1093 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001094 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001095 # autoserv exited without writing an exit code
1096 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001097 self._handle_pidfile_error(
1098 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001099
showard21baa452008-10-21 00:08:39 +00001100
1101 def _get_pidfile_info(self):
1102 """\
1103 After completion, self._state will contain:
1104 pid=None, exit_status=None if autoserv has not yet run
1105 pid!=None, exit_status=None if autoserv is running
1106 pid!=None, exit_status!=None if autoserv has completed
1107 """
1108 try:
1109 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001110 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001111 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001112
1113
showard170873e2009-01-07 00:22:26 +00001114 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001115 """\
1116 Called when no pidfile is found or no pid is in the pidfile.
1117 """
showard170873e2009-01-07 00:22:26 +00001118 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001119 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001120 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1121 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001122 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001123 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001124
1125
showard35162b02009-03-03 02:17:30 +00001126 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001127 """\
1128 Called when autoserv has exited without writing an exit status,
1129 or we've timed out waiting for autoserv to write a pid to the
1130 pidfile. In either case, we just return failure and the caller
1131 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001132
showard170873e2009-01-07 00:22:26 +00001133 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001134 """
1135 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001136 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001137 self._state.exit_status = 1
1138 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001139
1140
jadmanski0afbb632008-06-06 21:10:57 +00001141 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001142 self._get_pidfile_info()
1143 return self._state.exit_status
1144
1145
1146 def num_tests_failed(self):
1147 self._get_pidfile_info()
1148 assert self._state.num_tests_failed is not None
1149 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001150
1151
mbligh36768f02008-02-22 18:28:33 +00001152class Agent(object):
showard170873e2009-01-07 00:22:26 +00001153 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001154 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001155 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001156 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001157 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001158
showard170873e2009-01-07 00:22:26 +00001159 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1160 for task in tasks)
1161 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1162
showardd3dc1992009-04-22 21:01:40 +00001163 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001164 for task in tasks:
1165 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001166
1167
showardd3dc1992009-04-22 21:01:40 +00001168 def _clear_queue(self):
1169 self.queue = Queue.Queue(0)
1170
1171
showard170873e2009-01-07 00:22:26 +00001172 def _union_ids(self, id_lists):
1173 return set(itertools.chain(*id_lists))
1174
1175
jadmanski0afbb632008-06-06 21:10:57 +00001176 def add_task(self, task):
1177 self.queue.put_nowait(task)
1178 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001179
1180
jadmanski0afbb632008-06-06 21:10:57 +00001181 def tick(self):
showard21baa452008-10-21 00:08:39 +00001182 while not self.is_done():
1183 if self.active_task and not self.active_task.is_done():
1184 self.active_task.poll()
1185 if not self.active_task.is_done():
1186 return
1187 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001188
1189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001191 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001192 if self.active_task:
1193 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001194 if not self.active_task.success:
1195 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001196
jadmanski0afbb632008-06-06 21:10:57 +00001197 self.active_task = None
1198 if not self.is_done():
1199 self.active_task = self.queue.get_nowait()
1200 if self.active_task:
1201 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001205 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001206 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1207 # get reset.
1208 new_agent = Agent(self.active_task.failure_tasks)
1209 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001210
mblighe2586682008-02-29 22:45:46 +00001211
showard4c5374f2008-09-04 17:02:56 +00001212 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001213 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001214
1215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001217 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001218
1219
jadmanski0afbb632008-06-06 21:10:57 +00001220 def start(self):
1221 assert self.dispatcher
jadmanski0afbb632008-06-06 21:10:57 +00001222 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001223
jadmanski0afbb632008-06-06 21:10:57 +00001224
showardd3dc1992009-04-22 21:01:40 +00001225 def abort(self):
1226 if self.active_task:
1227 self.active_task.abort()
showard20f9bdd2009-04-29 19:48:33 +00001228 if not self.active_task.aborted: # tasks can choose to ignore aborts
1229 return
1230 self.active_task = None
1231
1232 self._clear_queue()
1233
showardd3dc1992009-04-22 21:01:40 +00001234
1235
mbligh36768f02008-02-22 18:28:33 +00001236class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001237 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1238 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001239 self.done = False
1240 self.failure_tasks = failure_tasks
1241 self.started = False
1242 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001243 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001244 self.task = None
1245 self.agent = None
1246 self.monitor = None
1247 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001248 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001249 self.queue_entry_ids = []
1250 self.host_ids = []
1251 self.log_file = None
1252
1253
1254 def _set_ids(self, host=None, queue_entries=None):
1255 if queue_entries and queue_entries != [None]:
1256 self.host_ids = [entry.host.id for entry in queue_entries]
1257 self.queue_entry_ids = [entry.id for entry in queue_entries]
1258 else:
1259 assert host
1260 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001261
1262
jadmanski0afbb632008-06-06 21:10:57 +00001263 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001264 if self.monitor:
1265 self.tick(self.monitor.exit_code())
1266 else:
1267 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001268
1269
jadmanski0afbb632008-06-06 21:10:57 +00001270 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001271 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001272 return
jadmanski0afbb632008-06-06 21:10:57 +00001273 if exit_code == 0:
1274 success = True
1275 else:
1276 success = False
mbligh36768f02008-02-22 18:28:33 +00001277
jadmanski0afbb632008-06-06 21:10:57 +00001278 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001279
1280
jadmanski0afbb632008-06-06 21:10:57 +00001281 def is_done(self):
1282 return self.done
mbligh36768f02008-02-22 18:28:33 +00001283
1284
jadmanski0afbb632008-06-06 21:10:57 +00001285 def finished(self, success):
1286 self.done = True
1287 self.success = success
1288 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001289
1290
jadmanski0afbb632008-06-06 21:10:57 +00001291 def prolog(self):
1292 pass
mblighd64e5702008-04-04 21:39:28 +00001293
1294
jadmanski0afbb632008-06-06 21:10:57 +00001295 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001296 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001297
mbligh36768f02008-02-22 18:28:33 +00001298
jadmanski0afbb632008-06-06 21:10:57 +00001299 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001300 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001301 _drone_manager.copy_to_results_repository(
1302 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def epilog(self):
1306 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001307
1308
jadmanski0afbb632008-06-06 21:10:57 +00001309 def start(self):
1310 assert self.agent
1311
1312 if not self.started:
1313 self.prolog()
1314 self.run()
1315
1316 self.started = True
1317
1318
1319 def abort(self):
1320 if self.monitor:
1321 self.monitor.kill()
1322 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001323 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001324 self.cleanup()
1325
1326
showard170873e2009-01-07 00:22:26 +00001327 def set_host_log_file(self, base_name, host):
1328 filename = '%s.%s' % (time.time(), base_name)
1329 self.log_file = os.path.join('hosts', host.hostname, filename)
1330
1331
showardde634ee2009-01-30 01:44:24 +00001332 def _get_consistent_execution_tag(self, queue_entries):
1333 first_execution_tag = queue_entries[0].execution_tag()
1334 for queue_entry in queue_entries[1:]:
1335 assert queue_entry.execution_tag() == first_execution_tag, (
1336 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1337 queue_entry,
1338 first_execution_tag,
1339 queue_entries[0]))
1340 return first_execution_tag
1341
1342
showard6b733412009-04-27 20:09:18 +00001343 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001344 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001345 if use_monitor is None:
1346 assert self.monitor
1347 use_monitor = self.monitor
1348 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001349 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001350 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001351 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001352 results_path)
showardde634ee2009-01-30 01:44:24 +00001353
1354 reparse_task = FinalReparseTask(queue_entries)
1355 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1356
1357
showardd3dc1992009-04-22 21:01:40 +00001358 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001359 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001360 self.monitor = PidfileRunMonitor()
1361 self.monitor.run(self.cmd, self._working_directory,
1362 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001363 log_file=self.log_file,
1364 pidfile_name=pidfile_name,
1365 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001366
1367
showardd9205182009-04-27 20:09:55 +00001368class TaskWithJobKeyvals(object):
1369 """AgentTask mixin providing functionality to help with job keyval files."""
1370 _KEYVAL_FILE = 'keyval'
1371 def _format_keyval(self, key, value):
1372 return '%s=%s' % (key, value)
1373
1374
1375 def _keyval_path(self):
1376 """Subclasses must override this"""
1377 raise NotImplemented
1378
1379
1380 def _write_keyval_after_job(self, field, value):
1381 assert self.monitor
1382 if not self.monitor.has_process():
1383 return
1384 _drone_manager.write_lines_to_file(
1385 self._keyval_path(), [self._format_keyval(field, value)],
1386 paired_with_process=self.monitor.get_process())
1387
1388
1389 def _job_queued_keyval(self, job):
1390 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1391
1392
1393 def _write_job_finished(self):
1394 self._write_keyval_after_job("job_finished", int(time.time()))
1395
1396
1397class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001398 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001399 """\
showard170873e2009-01-07 00:22:26 +00001400 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001401 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001402 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001403 # normalize the protection name
1404 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001405
jadmanski0afbb632008-06-06 21:10:57 +00001406 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001407 self.queue_entry_to_fail = queue_entry
1408 # *don't* include the queue entry in IDs -- if the queue entry is
1409 # aborted, we want to leave the repair task running
1410 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001411
1412 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001413 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1414 ['-R', '--host-protection', protection],
1415 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001416 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1417
showard170873e2009-01-07 00:22:26 +00001418 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001419
mbligh36768f02008-02-22 18:28:33 +00001420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001422 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001423 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001424 if self.queue_entry_to_fail:
1425 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001426
1427
showardd9205182009-04-27 20:09:55 +00001428 def _keyval_path(self):
1429 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1430
1431
showardde634ee2009-01-30 01:44:24 +00001432 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001433 assert self.queue_entry_to_fail
1434
1435 if self.queue_entry_to_fail.meta_host:
1436 return # don't fail metahost entries, they'll be reassigned
1437
1438 self.queue_entry_to_fail.update_from_database()
1439 if self.queue_entry_to_fail.status != 'Queued':
1440 return # entry has been aborted
1441
1442 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001443 queued_key, queued_time = self._job_queued_keyval(
1444 self.queue_entry_to_fail.job)
1445 self._write_keyval_after_job(queued_key, queued_time)
1446 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001447 # copy results logs into the normal place for job results
1448 _drone_manager.copy_results_on_drone(
1449 self.monitor.get_process(),
1450 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001451 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001452
showardccbd6c52009-03-21 00:10:21 +00001453 self._copy_and_parse_results([self.queue_entry_to_fail])
1454 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001455
1456
jadmanski0afbb632008-06-06 21:10:57 +00001457 def epilog(self):
1458 super(RepairTask, self).epilog()
1459 if self.success:
1460 self.host.set_status('Ready')
1461 else:
1462 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001463 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001464 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001465
1466
showard8fe93b52008-11-18 17:53:22 +00001467class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001468 def epilog(self):
1469 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001470 should_copy_results = (self.queue_entry and not self.success
1471 and not self.queue_entry.meta_host)
1472 if should_copy_results:
1473 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001474 destination = os.path.join(self.queue_entry.execution_tag(),
1475 os.path.basename(self.log_file))
1476 _drone_manager.copy_to_results_repository(
1477 self.monitor.get_process(), self.log_file,
1478 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001479
1480
1481class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001482 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001483 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001484 self.host = host or queue_entry.host
1485 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001486
jadmanski0afbb632008-06-06 21:10:57 +00001487 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001488 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1489 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001490 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001491 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1492 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001493
showard170873e2009-01-07 00:22:26 +00001494 self.set_host_log_file('verify', self.host)
1495 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001496
1497
jadmanski0afbb632008-06-06 21:10:57 +00001498 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001499 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001500 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001501 if self.queue_entry:
1502 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001503 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001504
1505
jadmanski0afbb632008-06-06 21:10:57 +00001506 def epilog(self):
1507 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001508
jadmanski0afbb632008-06-06 21:10:57 +00001509 if self.success:
1510 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001511
1512
showardd9205182009-04-27 20:09:55 +00001513class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001514 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001515 self.job = job
1516 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001517 super(QueueTask, self).__init__(cmd, self._execution_tag())
1518 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001519
1520
showard73ec0442009-02-07 02:05:20 +00001521 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001522 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001523
1524
1525 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1526 keyval_contents = '\n'.join(self._format_keyval(key, value)
1527 for key, value in keyval_dict.iteritems())
1528 # always end with a newline to allow additional keyvals to be written
1529 keyval_contents += '\n'
1530 _drone_manager.attach_file_to_execution(self._execution_tag(),
1531 keyval_contents,
1532 file_path=keyval_path)
1533
1534
1535 def _write_keyvals_before_job(self, keyval_dict):
1536 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1537
1538
showard170873e2009-01-07 00:22:26 +00001539 def _write_host_keyvals(self, host):
1540 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1541 host.hostname)
1542 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001543 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1544 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001545
1546
showard170873e2009-01-07 00:22:26 +00001547 def _execution_tag(self):
1548 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001549
1550
jadmanski0afbb632008-06-06 21:10:57 +00001551 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001552 queued_key, queued_time = self._job_queued_keyval(self.job)
1553 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001554 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001555 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001556 queue_entry.set_status('Running')
1557 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001558 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001559 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001560 assert len(self.queue_entries) == 1
1561 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001562
1563
showard35162b02009-03-03 02:17:30 +00001564 def _write_lost_process_error_file(self):
1565 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1566 _drone_manager.write_lines_to_file(error_file_path,
1567 [_LOST_PROCESS_ERROR])
1568
1569
showardd3dc1992009-04-22 21:01:40 +00001570 def _finish_task(self):
showardd9205182009-04-27 20:09:55 +00001571 self._write_job_finished()
1572
showardd3dc1992009-04-22 21:01:40 +00001573 # both of these conditionals can be true, iff the process ran, wrote a
1574 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001575 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001576 gather_task = GatherLogsTask(self.job, self.queue_entries)
1577 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001578
1579 if self.monitor.lost_process:
1580 self._write_lost_process_error_file()
1581 for queue_entry in self.queue_entries:
1582 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001583
1584
showardcbd74612008-11-19 21:42:02 +00001585 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001586 _drone_manager.write_lines_to_file(
1587 os.path.join(self._execution_tag(), 'status.log'),
1588 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001589 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001590
1591
jadmanskif7fa2cc2008-10-01 14:13:23 +00001592 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001593 if not self.monitor or not self.monitor.has_process():
1594 return
1595
jadmanskif7fa2cc2008-10-01 14:13:23 +00001596 # build up sets of all the aborted_by and aborted_on values
1597 aborted_by, aborted_on = set(), set()
1598 for queue_entry in self.queue_entries:
1599 if queue_entry.aborted_by:
1600 aborted_by.add(queue_entry.aborted_by)
1601 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1602 aborted_on.add(t)
1603
1604 # extract some actual, unique aborted by value and write it out
1605 assert len(aborted_by) <= 1
1606 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001607 aborted_by_value = aborted_by.pop()
1608 aborted_on_value = max(aborted_on)
1609 else:
1610 aborted_by_value = 'autotest_system'
1611 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001612
showarda0382352009-02-11 23:36:43 +00001613 self._write_keyval_after_job("aborted_by", aborted_by_value)
1614 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001615
showardcbd74612008-11-19 21:42:02 +00001616 aborted_on_string = str(datetime.datetime.fromtimestamp(
1617 aborted_on_value))
1618 self._write_status_comment('Job aborted by %s on %s' %
1619 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001620
1621
jadmanski0afbb632008-06-06 21:10:57 +00001622 def abort(self):
1623 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001624 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001625 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001626
1627
jadmanski0afbb632008-06-06 21:10:57 +00001628 def epilog(self):
1629 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001630 self._finish_task()
1631 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001632
1633
mblighbb421852008-03-11 22:36:16 +00001634class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001635 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001636 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001637 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001638
1639
jadmanski0afbb632008-06-06 21:10:57 +00001640 def run(self):
1641 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001642
1643
jadmanski0afbb632008-06-06 21:10:57 +00001644 def prolog(self):
1645 # recovering an existing process - don't do prolog
1646 pass
mblighbb421852008-03-11 22:36:16 +00001647
1648
showardd3dc1992009-04-22 21:01:40 +00001649class PostJobTask(AgentTask):
1650 def __init__(self, queue_entries, pidfile_name, logfile_name,
1651 run_monitor=None):
1652 """
1653 If run_monitor != None, we're recovering a running task.
1654 """
1655 self._queue_entries = queue_entries
1656 self._pidfile_name = pidfile_name
1657 self._run_monitor = run_monitor
1658
1659 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1660 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1661 self._autoserv_monitor = PidfileRunMonitor()
1662 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1663 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1664
1665 if _testing_mode:
1666 command = 'true'
1667 else:
1668 command = self._generate_command(self._results_dir)
1669
1670 super(PostJobTask, self).__init__(cmd=command,
1671 working_directory=self._execution_tag)
1672
1673 self.log_file = os.path.join(self._execution_tag, logfile_name)
1674 self._final_status = self._determine_final_status()
1675
1676
1677 def _generate_command(self, results_dir):
1678 raise NotImplementedError('Subclasses must override this')
1679
1680
1681 def _job_was_aborted(self):
1682 was_aborted = None
1683 for queue_entry in self._queue_entries:
1684 queue_entry.update_from_database()
1685 if was_aborted is None: # first queue entry
1686 was_aborted = bool(queue_entry.aborted)
1687 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1688 email_manager.manager.enqueue_notify_email(
1689 'Inconsistent abort state',
1690 'Queue entries have inconsistent abort state: ' +
1691 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1692 # don't crash here, just assume true
1693 return True
1694 return was_aborted
1695
1696
1697 def _determine_final_status(self):
1698 if self._job_was_aborted():
1699 return models.HostQueueEntry.Status.ABORTED
1700
1701 # we'll use a PidfileRunMonitor to read the autoserv exit status
1702 if self._autoserv_monitor.exit_code() == 0:
1703 return models.HostQueueEntry.Status.COMPLETED
1704 return models.HostQueueEntry.Status.FAILED
1705
1706
1707 def run(self):
1708 if self._run_monitor is not None:
1709 self.monitor = self._run_monitor
1710 else:
1711 # make sure we actually have results to work with.
1712 # this should never happen in normal operation.
1713 if not self._autoserv_monitor.has_process():
1714 email_manager.manager.enqueue_notify_email(
1715 'No results in post-job task',
1716 'No results in post-job task at %s' %
1717 self._autoserv_monitor.pidfile_id)
1718 self.finished(False)
1719 return
1720
1721 super(PostJobTask, self).run(
1722 pidfile_name=self._pidfile_name,
1723 paired_with_pidfile=self._paired_with_pidfile)
1724
1725
1726 def _set_all_statuses(self, status):
1727 for queue_entry in self._queue_entries:
1728 queue_entry.set_status(status)
1729
1730
1731 def abort(self):
1732 # override AgentTask.abort() to avoid killing the process and ending
1733 # the task. post-job tasks continue when the job is aborted.
1734 pass
1735
1736
1737class GatherLogsTask(PostJobTask):
1738 """
1739 Task responsible for
1740 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1741 * copying logs to the results repository
1742 * spawning CleanupTasks for hosts, if necessary
1743 * spawning a FinalReparseTask for the job
1744 """
1745 def __init__(self, job, queue_entries, run_monitor=None):
1746 self._job = job
1747 super(GatherLogsTask, self).__init__(
1748 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1749 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1750 self._set_ids(queue_entries=queue_entries)
1751
1752
1753 def _generate_command(self, results_dir):
1754 host_list = ','.join(queue_entry.host.hostname
1755 for queue_entry in self._queue_entries)
1756 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1757 '-r', results_dir]
1758
1759
1760 def prolog(self):
1761 super(GatherLogsTask, self).prolog()
1762 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1763
1764
1765 def _reboot_hosts(self):
1766 reboot_after = self._job.reboot_after
1767 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001768 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1769 do_reboot = True
1770 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001771 do_reboot = True
1772 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1773 final_success = (
1774 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1775 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1776 do_reboot = (final_success and num_tests_failed == 0)
1777
1778 for queue_entry in self._queue_entries:
1779 if do_reboot:
1780 # don't pass the queue entry to the CleanupTask. if the cleanup
1781 # fails, the job doesn't care -- it's over.
1782 cleanup_task = CleanupTask(host=queue_entry.host)
1783 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1784 else:
1785 queue_entry.host.set_status('Ready')
1786
1787
1788 def epilog(self):
1789 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001790 self._copy_and_parse_results(self._queue_entries,
1791 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001792 self._reboot_hosts()
1793
1794
showard0bbfc212009-04-29 21:06:13 +00001795 def run(self):
1796 if self._final_status == models.HostQueueEntry.Status.COMPLETED:
1797 # don't run at all if Autoserv exited successfully
1798 self.finished(True)
1799 else:
1800 super(GatherLogsTask, self).run()
1801
1802
showard8fe93b52008-11-18 17:53:22 +00001803class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001804 def __init__(self, host=None, queue_entry=None):
1805 assert bool(host) ^ bool(queue_entry)
1806 if queue_entry:
1807 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001808 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001809 self.host = host
showard170873e2009-01-07 00:22:26 +00001810
1811 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001812 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1813 ['--cleanup'],
1814 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001815 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001816 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1817 failure_tasks=[repair_task])
1818
1819 self._set_ids(host=host, queue_entries=[queue_entry])
1820 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001821
mblighd5c95802008-03-05 00:33:46 +00001822
jadmanski0afbb632008-06-06 21:10:57 +00001823 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001824 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001825 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001826 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001827
mblighd5c95802008-03-05 00:33:46 +00001828
showard21baa452008-10-21 00:08:39 +00001829 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001830 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001831 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001832 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001833 self.host.update_field('dirty', 0)
1834
1835
showardd3dc1992009-04-22 21:01:40 +00001836class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001837 _num_running_parses = 0
1838
showardd3dc1992009-04-22 21:01:40 +00001839 def __init__(self, queue_entries, run_monitor=None):
1840 super(FinalReparseTask, self).__init__(queue_entries,
1841 pidfile_name=_PARSER_PID_FILE,
1842 logfile_name='.parse.log',
1843 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001844 # don't use _set_ids, since we don't want to set the host_ids
1845 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001846 self._parse_started = False
1847
showard97aed502008-11-04 02:01:24 +00001848
1849 @classmethod
1850 def _increment_running_parses(cls):
1851 cls._num_running_parses += 1
1852
1853
1854 @classmethod
1855 def _decrement_running_parses(cls):
1856 cls._num_running_parses -= 1
1857
1858
1859 @classmethod
1860 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001861 return (cls._num_running_parses <
1862 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001863
1864
1865 def prolog(self):
1866 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001867 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001868
1869
1870 def epilog(self):
1871 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001872 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001873
1874
showardd3dc1992009-04-22 21:01:40 +00001875 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001876 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001877 results_dir]
showard97aed502008-11-04 02:01:24 +00001878
1879
1880 def poll(self):
1881 # override poll to keep trying to start until the parse count goes down
1882 # and we can, at which point we revert to default behavior
1883 if self._parse_started:
1884 super(FinalReparseTask, self).poll()
1885 else:
1886 self._try_starting_parse()
1887
1888
1889 def run(self):
1890 # override run() to not actually run unless we can
1891 self._try_starting_parse()
1892
1893
1894 def _try_starting_parse(self):
1895 if not self._can_run_new_parse():
1896 return
showard170873e2009-01-07 00:22:26 +00001897
showard97aed502008-11-04 02:01:24 +00001898 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001899 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001900
showard97aed502008-11-04 02:01:24 +00001901 self._increment_running_parses()
1902 self._parse_started = True
1903
1904
1905 def finished(self, success):
1906 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001907 if self._parse_started:
1908 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001909
1910
showardc9ae1782009-01-30 01:42:37 +00001911class SetEntryPendingTask(AgentTask):
1912 def __init__(self, queue_entry):
1913 super(SetEntryPendingTask, self).__init__(cmd='')
1914 self._queue_entry = queue_entry
1915 self._set_ids(queue_entries=[queue_entry])
1916
1917
1918 def run(self):
1919 agent = self._queue_entry.on_pending()
1920 if agent:
1921 self.agent.dispatcher.add_agent(agent)
1922 self.finished(True)
1923
1924
showarda3c58572009-03-12 20:36:59 +00001925class DBError(Exception):
1926 """Raised by the DBObject constructor when its select fails."""
1927
1928
mbligh36768f02008-02-22 18:28:33 +00001929class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001930 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001931
1932 # Subclasses MUST override these:
1933 _table_name = ''
1934 _fields = ()
1935
showarda3c58572009-03-12 20:36:59 +00001936 # A mapping from (type, id) to the instance of the object for that
1937 # particular id. This prevents us from creating new Job() and Host()
1938 # instances for every HostQueueEntry object that we instantiate as
1939 # multiple HQEs often share the same Job.
1940 _instances_by_type_and_id = weakref.WeakValueDictionary()
1941 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001942
showarda3c58572009-03-12 20:36:59 +00001943
1944 def __new__(cls, id=None, **kwargs):
1945 """
1946 Look to see if we already have an instance for this particular type
1947 and id. If so, use it instead of creating a duplicate instance.
1948 """
1949 if id is not None:
1950 instance = cls._instances_by_type_and_id.get((cls, id))
1951 if instance:
1952 return instance
1953 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1954
1955
1956 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001957 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001958 assert self._table_name, '_table_name must be defined in your class'
1959 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001960 if not new_record:
1961 if self._initialized and not always_query:
1962 return # We've already been initialized.
1963 if id is None:
1964 id = row[0]
1965 # Tell future constructors to use us instead of re-querying while
1966 # this instance is still around.
1967 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001968
showard6ae5ea92009-02-25 00:11:51 +00001969 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001970
jadmanski0afbb632008-06-06 21:10:57 +00001971 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001972
jadmanski0afbb632008-06-06 21:10:57 +00001973 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001974 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001975
showarda3c58572009-03-12 20:36:59 +00001976 if self._initialized:
1977 differences = self._compare_fields_in_row(row)
1978 if differences:
showard7629f142009-03-27 21:02:02 +00001979 logging.warn(
1980 'initialized %s %s instance requery is updating: %s',
1981 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001982 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001983 self._initialized = True
1984
1985
1986 @classmethod
1987 def _clear_instance_cache(cls):
1988 """Used for testing, clear the internal instance cache."""
1989 cls._instances_by_type_and_id.clear()
1990
1991
showardccbd6c52009-03-21 00:10:21 +00001992 def _fetch_row_from_db(self, row_id):
1993 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1994 rows = _db.execute(sql, (row_id,))
1995 if not rows:
showard76e29d12009-04-15 21:53:10 +00001996 raise DBError("row not found (table=%s, row id=%s)"
1997 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001998 return rows[0]
1999
2000
showarda3c58572009-03-12 20:36:59 +00002001 def _assert_row_length(self, row):
2002 assert len(row) == len(self._fields), (
2003 "table = %s, row = %s/%d, fields = %s/%d" % (
2004 self.__table, row, len(row), self._fields, len(self._fields)))
2005
2006
2007 def _compare_fields_in_row(self, row):
2008 """
2009 Given a row as returned by a SELECT query, compare it to our existing
2010 in memory fields.
2011
2012 @param row - A sequence of values corresponding to fields named in
2013 The class attribute _fields.
2014
2015 @returns A dictionary listing the differences keyed by field name
2016 containing tuples of (current_value, row_value).
2017 """
2018 self._assert_row_length(row)
2019 differences = {}
2020 for field, row_value in itertools.izip(self._fields, row):
2021 current_value = getattr(self, field)
2022 if current_value != row_value:
2023 differences[field] = (current_value, row_value)
2024 return differences
showard2bab8f42008-11-12 18:15:22 +00002025
2026
2027 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002028 """
2029 Update our field attributes using a single row returned by SELECT.
2030
2031 @param row - A sequence of values corresponding to fields named in
2032 the class fields list.
2033 """
2034 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002035
showard2bab8f42008-11-12 18:15:22 +00002036 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002037 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002038 setattr(self, field, value)
2039 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002040
showard2bab8f42008-11-12 18:15:22 +00002041 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002042
mblighe2586682008-02-29 22:45:46 +00002043
showardccbd6c52009-03-21 00:10:21 +00002044 def update_from_database(self):
2045 assert self.id is not None
2046 row = self._fetch_row_from_db(self.id)
2047 self._update_fields_from_row(row)
2048
2049
jadmanski0afbb632008-06-06 21:10:57 +00002050 def count(self, where, table = None):
2051 if not table:
2052 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002053
jadmanski0afbb632008-06-06 21:10:57 +00002054 rows = _db.execute("""
2055 SELECT count(*) FROM %s
2056 WHERE %s
2057 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002058
jadmanski0afbb632008-06-06 21:10:57 +00002059 assert len(rows) == 1
2060
2061 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002062
2063
showardd3dc1992009-04-22 21:01:40 +00002064 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002065 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002066
showard2bab8f42008-11-12 18:15:22 +00002067 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002068 return
mbligh36768f02008-02-22 18:28:33 +00002069
mblighf8c624d2008-07-03 16:58:45 +00002070 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002071 _db.execute(query, (value, self.id))
2072
showard2bab8f42008-11-12 18:15:22 +00002073 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002074
2075
jadmanski0afbb632008-06-06 21:10:57 +00002076 def save(self):
2077 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002078 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002079 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002080 values = []
2081 for key in keys:
2082 value = getattr(self, key)
2083 if value is None:
2084 values.append('NULL')
2085 else:
2086 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002087 values_str = ','.join(values)
2088 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2089 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002090 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002091 # Update our id to the one the database just assigned to us.
2092 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002093
2094
jadmanski0afbb632008-06-06 21:10:57 +00002095 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002096 self._instances_by_type_and_id.pop((type(self), id), None)
2097 self._initialized = False
2098 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002099 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2100 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002101
2102
showard63a34772008-08-18 19:32:50 +00002103 @staticmethod
2104 def _prefix_with(string, prefix):
2105 if string:
2106 string = prefix + string
2107 return string
2108
2109
jadmanski0afbb632008-06-06 21:10:57 +00002110 @classmethod
showard989f25d2008-10-01 11:38:11 +00002111 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002112 """
2113 Construct instances of our class based on the given database query.
2114
2115 @yields One class instance for each row fetched.
2116 """
showard63a34772008-08-18 19:32:50 +00002117 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2118 where = cls._prefix_with(where, 'WHERE ')
2119 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002120 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002121 'joins' : joins,
2122 'where' : where,
2123 'order_by' : order_by})
2124 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002125 for row in rows:
2126 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002127
mbligh36768f02008-02-22 18:28:33 +00002128
2129class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002130 _table_name = 'ineligible_host_queues'
2131 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002132
2133
showard89f84db2009-03-12 20:39:13 +00002134class AtomicGroup(DBObject):
2135 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002136 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2137 'invalid')
showard89f84db2009-03-12 20:39:13 +00002138
2139
showard989f25d2008-10-01 11:38:11 +00002140class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002141 _table_name = 'labels'
2142 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002143 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002144
2145
mbligh36768f02008-02-22 18:28:33 +00002146class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002147 _table_name = 'hosts'
2148 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2149 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2150
2151
jadmanski0afbb632008-06-06 21:10:57 +00002152 def current_task(self):
2153 rows = _db.execute("""
2154 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2155 """, (self.id,))
2156
2157 if len(rows) == 0:
2158 return None
2159 else:
2160 assert len(rows) == 1
2161 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002162 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002163
2164
jadmanski0afbb632008-06-06 21:10:57 +00002165 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002166 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002167 if self.current_task():
2168 self.current_task().requeue()
2169
showard6ae5ea92009-02-25 00:11:51 +00002170
jadmanski0afbb632008-06-06 21:10:57 +00002171 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002172 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002173 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002174
2175
showard170873e2009-01-07 00:22:26 +00002176 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002177 """
showard170873e2009-01-07 00:22:26 +00002178 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002179 """
2180 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002181 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002182 FROM labels
2183 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002184 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002185 ORDER BY labels.name
2186 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002187 platform = None
2188 all_labels = []
2189 for label_name, is_platform in rows:
2190 if is_platform:
2191 platform = label_name
2192 all_labels.append(label_name)
2193 return platform, all_labels
2194
2195
2196 def reverify_tasks(self):
2197 cleanup_task = CleanupTask(host=self)
2198 verify_task = VerifyTask(host=self)
2199 # just to make sure this host does not get taken away
2200 self.set_status('Cleaning')
2201 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002202
2203
mbligh36768f02008-02-22 18:28:33 +00002204class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002205 _table_name = 'host_queue_entries'
2206 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002207 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002208 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002209
2210
showarda3c58572009-03-12 20:36:59 +00002211 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002212 assert id or row
showarda3c58572009-03-12 20:36:59 +00002213 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002214 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002215
jadmanski0afbb632008-06-06 21:10:57 +00002216 if self.host_id:
2217 self.host = Host(self.host_id)
2218 else:
2219 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002220
showard170873e2009-01-07 00:22:26 +00002221 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002222 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002223
2224
showard89f84db2009-03-12 20:39:13 +00002225 @classmethod
2226 def clone(cls, template):
2227 """
2228 Creates a new row using the values from a template instance.
2229
2230 The new instance will not exist in the database or have a valid
2231 id attribute until its save() method is called.
2232 """
2233 assert isinstance(template, cls)
2234 new_row = [getattr(template, field) for field in cls._fields]
2235 clone = cls(row=new_row, new_record=True)
2236 clone.id = None
2237 return clone
2238
2239
showardc85c21b2008-11-24 22:17:37 +00002240 def _view_job_url(self):
2241 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2242
2243
jadmanski0afbb632008-06-06 21:10:57 +00002244 def set_host(self, host):
2245 if host:
2246 self.queue_log_record('Assigning host ' + host.hostname)
2247 self.update_field('host_id', host.id)
2248 self.update_field('active', True)
2249 self.block_host(host.id)
2250 else:
2251 self.queue_log_record('Releasing host')
2252 self.unblock_host(self.host.id)
2253 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002254
jadmanski0afbb632008-06-06 21:10:57 +00002255 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002256
2257
jadmanski0afbb632008-06-06 21:10:57 +00002258 def get_host(self):
2259 return self.host
mbligh36768f02008-02-22 18:28:33 +00002260
2261
jadmanski0afbb632008-06-06 21:10:57 +00002262 def queue_log_record(self, log_line):
2263 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002264 _drone_manager.write_lines_to_file(self.queue_log_path,
2265 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002266
2267
jadmanski0afbb632008-06-06 21:10:57 +00002268 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002269 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002270 row = [0, self.job.id, host_id]
2271 block = IneligibleHostQueue(row=row, new_record=True)
2272 block.save()
mblighe2586682008-02-29 22:45:46 +00002273
2274
jadmanski0afbb632008-06-06 21:10:57 +00002275 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002276 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002277 blocks = IneligibleHostQueue.fetch(
2278 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2279 for block in blocks:
2280 block.delete()
mblighe2586682008-02-29 22:45:46 +00002281
2282
showard2bab8f42008-11-12 18:15:22 +00002283 def set_execution_subdir(self, subdir=None):
2284 if subdir is None:
2285 assert self.get_host()
2286 subdir = self.get_host().hostname
2287 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002288
2289
showard6355f6b2008-12-05 18:52:13 +00002290 def _get_hostname(self):
2291 if self.host:
2292 return self.host.hostname
2293 return 'no host'
2294
2295
showard170873e2009-01-07 00:22:26 +00002296 def __str__(self):
2297 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2298
2299
jadmanski0afbb632008-06-06 21:10:57 +00002300 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002301 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002302
showardb18134f2009-03-20 20:52:18 +00002303 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002304
showardc85c21b2008-11-24 22:17:37 +00002305 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002306 self.update_field('complete', False)
2307 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002308
jadmanski0afbb632008-06-06 21:10:57 +00002309 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002310 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002311 self.update_field('complete', False)
2312 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002313
showardc85c21b2008-11-24 22:17:37 +00002314 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002315 self.update_field('complete', True)
2316 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002317
2318 should_email_status = (status.lower() in _notify_email_statuses or
2319 'all' in _notify_email_statuses)
2320 if should_email_status:
2321 self._email_on_status(status)
2322
2323 self._email_on_job_complete()
2324
2325
2326 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002327 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002328
2329 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2330 self.job.id, self.job.name, hostname, status)
2331 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2332 self.job.id, self.job.name, hostname, status,
2333 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002334 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002335
2336
2337 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002338 if not self.job.is_finished():
2339 return
showard542e8402008-09-19 20:16:18 +00002340
showardc85c21b2008-11-24 22:17:37 +00002341 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002342 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002343 for queue_entry in hosts_queue:
2344 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002345 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002346 queue_entry.status))
2347
2348 summary_text = "\n".join(summary_text)
2349 status_counts = models.Job.objects.get_status_counts(
2350 [self.job.id])[self.job.id]
2351 status = ', '.join('%d %s' % (count, status) for status, count
2352 in status_counts.iteritems())
2353
2354 subject = 'Autotest: Job ID: %s "%s" %s' % (
2355 self.job.id, self.job.name, status)
2356 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2357 self.job.id, self.job.name, status, self._view_job_url(),
2358 summary_text)
showard170873e2009-01-07 00:22:26 +00002359 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002360
2361
showard89f84db2009-03-12 20:39:13 +00002362 def run(self, assigned_host=None):
2363 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002364 assert assigned_host
2365 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002366 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002367
showardb18134f2009-03-20 20:52:18 +00002368 logging.info("%s/%s/%s scheduled on %s, status=%s",
2369 self.job.name, self.meta_host, self.atomic_group_id,
2370 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002371
jadmanski0afbb632008-06-06 21:10:57 +00002372 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002373
showard6ae5ea92009-02-25 00:11:51 +00002374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def requeue(self):
2376 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002377 # verify/cleanup failure sets the execution subdir, so reset it here
2378 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002379 if self.meta_host:
2380 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002381
2382
jadmanski0afbb632008-06-06 21:10:57 +00002383 def handle_host_failure(self):
2384 """\
2385 Called when this queue entry's host has failed verification and
2386 repair.
2387 """
2388 assert not self.meta_host
2389 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002390 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002391
2392
jadmanskif7fa2cc2008-10-01 14:13:23 +00002393 @property
2394 def aborted_by(self):
2395 self._load_abort_info()
2396 return self._aborted_by
2397
2398
2399 @property
2400 def aborted_on(self):
2401 self._load_abort_info()
2402 return self._aborted_on
2403
2404
2405 def _load_abort_info(self):
2406 """ Fetch info about who aborted the job. """
2407 if hasattr(self, "_aborted_by"):
2408 return
2409 rows = _db.execute("""
2410 SELECT users.login, aborted_host_queue_entries.aborted_on
2411 FROM aborted_host_queue_entries
2412 INNER JOIN users
2413 ON users.id = aborted_host_queue_entries.aborted_by_id
2414 WHERE aborted_host_queue_entries.queue_entry_id = %s
2415 """, (self.id,))
2416 if rows:
2417 self._aborted_by, self._aborted_on = rows[0]
2418 else:
2419 self._aborted_by = self._aborted_on = None
2420
2421
showardb2e2c322008-10-14 17:33:55 +00002422 def on_pending(self):
2423 """
2424 Called when an entry in a synchronous job has passed verify. If the
2425 job is ready to run, returns an agent to run the job. Returns None
2426 otherwise.
2427 """
2428 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002429 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002430 if self.job.is_ready():
2431 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002432 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002433 return None
2434
2435
showardd3dc1992009-04-22 21:01:40 +00002436 def abort(self, dispatcher):
2437 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002438
showardd3dc1992009-04-22 21:01:40 +00002439 Status = models.HostQueueEntry.Status
2440 has_running_job_agent = (
2441 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2442 and dispatcher.get_agents_for_entry(self))
2443 if has_running_job_agent:
2444 # do nothing; post-job tasks will finish and then mark this entry
2445 # with status "Aborted" and take care of the host
2446 return
2447
2448 if self.status in (Status.STARTING, Status.PENDING):
2449 self.host.set_status(models.Host.Status.READY)
2450 elif self.status == Status.VERIFYING:
2451 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2452
2453 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002454
2455 def execution_tag(self):
2456 assert self.execution_subdir
2457 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002458
2459
mbligh36768f02008-02-22 18:28:33 +00002460class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002461 _table_name = 'jobs'
2462 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2463 'control_type', 'created_on', 'synch_count', 'timeout',
2464 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2465
2466
showarda3c58572009-03-12 20:36:59 +00002467 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002468 assert id or row
showarda3c58572009-03-12 20:36:59 +00002469 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002470
mblighe2586682008-02-29 22:45:46 +00002471
jadmanski0afbb632008-06-06 21:10:57 +00002472 def is_server_job(self):
2473 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002474
2475
showard170873e2009-01-07 00:22:26 +00002476 def tag(self):
2477 return "%s-%s" % (self.id, self.owner)
2478
2479
jadmanski0afbb632008-06-06 21:10:57 +00002480 def get_host_queue_entries(self):
2481 rows = _db.execute("""
2482 SELECT * FROM host_queue_entries
2483 WHERE job_id= %s
2484 """, (self.id,))
2485 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002486
jadmanski0afbb632008-06-06 21:10:57 +00002487 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002488
jadmanski0afbb632008-06-06 21:10:57 +00002489 return entries
mbligh36768f02008-02-22 18:28:33 +00002490
2491
jadmanski0afbb632008-06-06 21:10:57 +00002492 def set_status(self, status, update_queues=False):
2493 self.update_field('status',status)
2494
2495 if update_queues:
2496 for queue_entry in self.get_host_queue_entries():
2497 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002498
2499
jadmanski0afbb632008-06-06 21:10:57 +00002500 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002501 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2502 status='Pending')
2503 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002504
2505
jadmanski0afbb632008-06-06 21:10:57 +00002506 def num_machines(self, clause = None):
2507 sql = "job_id=%s" % self.id
2508 if clause:
2509 sql += " AND (%s)" % clause
2510 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002511
2512
jadmanski0afbb632008-06-06 21:10:57 +00002513 def num_queued(self):
2514 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002515
2516
jadmanski0afbb632008-06-06 21:10:57 +00002517 def num_active(self):
2518 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002519
2520
jadmanski0afbb632008-06-06 21:10:57 +00002521 def num_complete(self):
2522 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002523
2524
jadmanski0afbb632008-06-06 21:10:57 +00002525 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002526 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002527
mbligh36768f02008-02-22 18:28:33 +00002528
showard6bb7c292009-01-30 01:44:51 +00002529 def _not_yet_run_entries(self, include_verifying=True):
2530 statuses = [models.HostQueueEntry.Status.QUEUED,
2531 models.HostQueueEntry.Status.PENDING]
2532 if include_verifying:
2533 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2534 return models.HostQueueEntry.objects.filter(job=self.id,
2535 status__in=statuses)
2536
2537
2538 def _stop_all_entries(self):
2539 entries_to_stop = self._not_yet_run_entries(
2540 include_verifying=False)
2541 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002542 assert not child_entry.complete, (
2543 '%s status=%s, active=%s, complete=%s' %
2544 (child_entry.id, child_entry.status, child_entry.active,
2545 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002546 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2547 child_entry.host.status = models.Host.Status.READY
2548 child_entry.host.save()
2549 child_entry.status = models.HostQueueEntry.Status.STOPPED
2550 child_entry.save()
2551
showard2bab8f42008-11-12 18:15:22 +00002552 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002553 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002554 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002555 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002556
2557
jadmanski0afbb632008-06-06 21:10:57 +00002558 def write_to_machines_file(self, queue_entry):
2559 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002560 file_path = os.path.join(self.tag(), '.machines')
2561 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002562
2563
showard2bab8f42008-11-12 18:15:22 +00002564 def _next_group_name(self):
2565 query = models.HostQueueEntry.objects.filter(
2566 job=self.id).values('execution_subdir').distinct()
2567 subdirs = (entry['execution_subdir'] for entry in query)
2568 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2569 ids = [int(match.group(1)) for match in groups if match]
2570 if ids:
2571 next_id = max(ids) + 1
2572 else:
2573 next_id = 0
2574 return "group%d" % next_id
2575
2576
showard170873e2009-01-07 00:22:26 +00002577 def _write_control_file(self, execution_tag):
2578 control_path = _drone_manager.attach_file_to_execution(
2579 execution_tag, self.control_file)
2580 return control_path
mbligh36768f02008-02-22 18:28:33 +00002581
showardb2e2c322008-10-14 17:33:55 +00002582
showard2bab8f42008-11-12 18:15:22 +00002583 def get_group_entries(self, queue_entry_from_group):
2584 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002585 return list(HostQueueEntry.fetch(
2586 where='job_id=%s AND execution_subdir=%s',
2587 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002588
2589
showardb2e2c322008-10-14 17:33:55 +00002590 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002591 assert queue_entries
2592 execution_tag = queue_entries[0].execution_tag()
2593 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002594 hostnames = ','.join([entry.get_host().hostname
2595 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002596
showard87ba02a2009-04-20 19:37:32 +00002597 params = _autoserv_command_line(
2598 hostnames, execution_tag,
2599 ['-P', execution_tag, '-n',
2600 _drone_manager.absolute_path(control_path)],
2601 job=self)
mbligh36768f02008-02-22 18:28:33 +00002602
jadmanski0afbb632008-06-06 21:10:57 +00002603 if not self.is_server_job():
2604 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002605
showardb2e2c322008-10-14 17:33:55 +00002606 return params
mblighe2586682008-02-29 22:45:46 +00002607
mbligh36768f02008-02-22 18:28:33 +00002608
showardc9ae1782009-01-30 01:42:37 +00002609 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002610 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002611 return True
showard0fc38302008-10-23 00:44:07 +00002612 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002613 return queue_entry.get_host().dirty
2614 return False
showard21baa452008-10-21 00:08:39 +00002615
showardc9ae1782009-01-30 01:42:37 +00002616
2617 def _should_run_verify(self, queue_entry):
2618 do_not_verify = (queue_entry.host.protection ==
2619 host_protections.Protection.DO_NOT_VERIFY)
2620 if do_not_verify:
2621 return False
2622 return self.run_verify
2623
2624
2625 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002626 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002627 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002628 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002629 if self._should_run_verify(queue_entry):
2630 tasks.append(VerifyTask(queue_entry=queue_entry))
2631 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002632 return tasks
2633
2634
showard2bab8f42008-11-12 18:15:22 +00002635 def _assign_new_group(self, queue_entries):
2636 if len(queue_entries) == 1:
2637 group_name = queue_entries[0].get_host().hostname
2638 else:
2639 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002640 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002641 self.id, [entry.host.hostname for entry in queue_entries],
2642 group_name)
2643
2644 for queue_entry in queue_entries:
2645 queue_entry.set_execution_subdir(group_name)
2646
2647
2648 def _choose_group_to_run(self, include_queue_entry):
2649 chosen_entries = [include_queue_entry]
2650
2651 num_entries_needed = self.synch_count - 1
2652 if num_entries_needed > 0:
2653 pending_entries = HostQueueEntry.fetch(
2654 where='job_id = %s AND status = "Pending" AND id != %s',
2655 params=(self.id, include_queue_entry.id))
2656 chosen_entries += list(pending_entries)[:num_entries_needed]
2657
2658 self._assign_new_group(chosen_entries)
2659 return chosen_entries
2660
2661
2662 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002663 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002664 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2665 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002666
showard2bab8f42008-11-12 18:15:22 +00002667 queue_entries = self._choose_group_to_run(queue_entry)
2668 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002669
2670
2671 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002672 for queue_entry in queue_entries:
2673 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002674 params = self._get_autoserv_params(queue_entries)
2675 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2676 cmd=params)
2677 tasks = initial_tasks + [queue_task]
2678 entry_ids = [entry.id for entry in queue_entries]
2679
showard170873e2009-01-07 00:22:26 +00002680 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002681
2682
mbligh36768f02008-02-22 18:28:33 +00002683if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002684 main()