blob: ce1e431601636337698db8707b40f171bebb9fd4 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
228 '-r', _drone_manager.absolute_path(results_dir)]
229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
233 return autoserv_argv + extra_args
234
235
showard89f84db2009-03-12 20:39:13 +0000236class SchedulerError(Exception):
237 """Raised by HostScheduler when an inconsistent state occurs."""
238
239
showard63a34772008-08-18 19:32:50 +0000240class HostScheduler(object):
241 def _get_ready_hosts(self):
242 # avoid any host with a currently active queue entry against it
243 hosts = Host.fetch(
244 joins='LEFT JOIN host_queue_entries AS active_hqe '
245 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000246 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000247 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000248 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000249 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
250 return dict((host.id, host) for host in hosts)
251
252
253 @staticmethod
254 def _get_sql_id_list(id_list):
255 return ','.join(str(item_id) for item_id in id_list)
256
257
258 @classmethod
showard989f25d2008-10-01 11:38:11 +0000259 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000260 if not id_list:
261 return {}
showard63a34772008-08-18 19:32:50 +0000262 query %= cls._get_sql_id_list(id_list)
263 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000264 return cls._process_many2many_dict(rows, flip)
265
266
267 @staticmethod
268 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000269 result = {}
270 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000271 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000272 if flip:
273 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000274 result.setdefault(left_id, set()).add(right_id)
275 return result
276
277
278 @classmethod
279 def _get_job_acl_groups(cls, job_ids):
280 query = """
showardd9ac4452009-02-07 02:04:37 +0000281 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000282 FROM jobs
283 INNER JOIN users ON users.login = jobs.owner
284 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
285 WHERE jobs.id IN (%s)
286 """
287 return cls._get_many2many_dict(query, job_ids)
288
289
290 @classmethod
291 def _get_job_ineligible_hosts(cls, job_ids):
292 query = """
293 SELECT job_id, host_id
294 FROM ineligible_host_queues
295 WHERE job_id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
showard989f25d2008-10-01 11:38:11 +0000301 def _get_job_dependencies(cls, job_ids):
302 query = """
303 SELECT job_id, label_id
304 FROM jobs_dependency_labels
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard63a34772008-08-18 19:32:50 +0000311 def _get_host_acls(cls, host_ids):
312 query = """
showardd9ac4452009-02-07 02:04:37 +0000313 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000314 FROM acl_groups_hosts
315 WHERE host_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, host_ids)
318
319
320 @classmethod
321 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000322 if not host_ids:
323 return {}, {}
showard63a34772008-08-18 19:32:50 +0000324 query = """
325 SELECT label_id, host_id
326 FROM hosts_labels
327 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000328 """ % cls._get_sql_id_list(host_ids)
329 rows = _db.execute(query)
330 labels_to_hosts = cls._process_many2many_dict(rows)
331 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
332 return labels_to_hosts, hosts_to_labels
333
334
335 @classmethod
336 def _get_labels(cls):
337 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000338
339
340 def refresh(self, pending_queue_entries):
341 self._hosts_available = self._get_ready_hosts()
342
343 relevant_jobs = [queue_entry.job_id
344 for queue_entry in pending_queue_entries]
345 self._job_acls = self._get_job_acl_groups(relevant_jobs)
346 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000347 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000348
349 host_ids = self._hosts_available.keys()
350 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000351 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
352
353 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000354
355
356 def _is_acl_accessible(self, host_id, queue_entry):
357 job_acls = self._job_acls.get(queue_entry.job_id, set())
358 host_acls = self._host_acls.get(host_id, set())
359 return len(host_acls.intersection(job_acls)) > 0
360
361
showard989f25d2008-10-01 11:38:11 +0000362 def _check_job_dependencies(self, job_dependencies, host_labels):
363 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000364 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000365
366
367 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
368 queue_entry):
showardade14e22009-01-26 22:38:32 +0000369 if not queue_entry.meta_host:
370 # bypass only_if_needed labels when a specific host is selected
371 return True
372
showard989f25d2008-10-01 11:38:11 +0000373 for label_id in host_labels:
374 label = self._labels[label_id]
375 if not label.only_if_needed:
376 # we don't care about non-only_if_needed labels
377 continue
378 if queue_entry.meta_host == label_id:
379 # if the label was requested in a metahost it's OK
380 continue
381 if label_id not in job_dependencies:
382 return False
383 return True
384
385
showard89f84db2009-03-12 20:39:13 +0000386 def _check_atomic_group_labels(self, host_labels, queue_entry):
387 """
388 Determine if the given HostQueueEntry's atomic group settings are okay
389 to schedule on a host with the given labels.
390
391 @param host_labels - A list of label ids that the host has.
392 @param queue_entry - The HostQueueEntry being considered for the host.
393
394 @returns True if atomic group settings are okay, False otherwise.
395 """
396 return (self._get_host_atomic_group_id(host_labels) ==
397 queue_entry.atomic_group_id)
398
399
400 def _get_host_atomic_group_id(self, host_labels):
401 """
402 Return the atomic group label id for a host with the given set of
403 labels if any, or None otherwise. Raises an exception if more than
404 one atomic group are found in the set of labels.
405
406 @param host_labels - A list of label ids that the host has.
407
408 @returns The id of the atomic group found on a label in host_labels
409 or None if no atomic group label is found.
410 @raises SchedulerError - If more than one atomic group label is found.
411 """
412 atomic_ids = [self._labels[label_id].atomic_group_id
413 for label_id in host_labels
414 if self._labels[label_id].atomic_group_id is not None]
415 if not atomic_ids:
416 return None
417 if len(atomic_ids) > 1:
418 raise SchedulerError('More than one atomic label on host.')
419 return atomic_ids[0]
420
421
422 def _get_atomic_group_labels(self, atomic_group_id):
423 """
424 Lookup the label ids that an atomic_group is associated with.
425
426 @param atomic_group_id - The id of the AtomicGroup to look up.
427
428 @returns A generator yeilding Label ids for this atomic group.
429 """
430 return (id for id, label in self._labels.iteritems()
431 if label.atomic_group_id == atomic_group_id
432 and not label.invalid)
433
434
435 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
436 """
437 @param group_hosts - A sequence of Host ids to test for usability
438 and eligibility against the Job associated with queue_entry.
439 @param queue_entry - The HostQueueEntry that these hosts are being
440 tested for eligibility against.
441
442 @returns A subset of group_hosts Host ids that are eligible for the
443 supplied queue_entry.
444 """
445 return set(host_id for host_id in group_hosts
446 if self._is_host_usable(host_id)
447 and self._is_host_eligible_for_job(host_id, queue_entry))
448
449
showard989f25d2008-10-01 11:38:11 +0000450 def _is_host_eligible_for_job(self, host_id, queue_entry):
451 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
452 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000453
showard89f84db2009-03-12 20:39:13 +0000454 return (self._is_acl_accessible(host_id, queue_entry) and
455 self._check_job_dependencies(job_dependencies, host_labels) and
456 self._check_only_if_needed_labels(
457 job_dependencies, host_labels, queue_entry) and
458 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000459
460
showard63a34772008-08-18 19:32:50 +0000461 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000462 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000463 return None
464 return self._hosts_available.pop(queue_entry.host_id, None)
465
466
467 def _is_host_usable(self, host_id):
468 if host_id not in self._hosts_available:
469 # host was already used during this scheduling cycle
470 return False
471 if self._hosts_available[host_id].invalid:
472 # Invalid hosts cannot be used for metahosts. They're included in
473 # the original query because they can be used by non-metahosts.
474 return False
475 return True
476
477
478 def _schedule_metahost(self, queue_entry):
479 label_id = queue_entry.meta_host
480 hosts_in_label = self._label_hosts.get(label_id, set())
481 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
482 set())
483
484 # must iterate over a copy so we can mutate the original while iterating
485 for host_id in list(hosts_in_label):
486 if not self._is_host_usable(host_id):
487 hosts_in_label.remove(host_id)
488 continue
489 if host_id in ineligible_host_ids:
490 continue
showard989f25d2008-10-01 11:38:11 +0000491 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000492 continue
493
showard89f84db2009-03-12 20:39:13 +0000494 # Remove the host from our cached internal state before returning
495 # the host object.
showard63a34772008-08-18 19:32:50 +0000496 hosts_in_label.remove(host_id)
497 return self._hosts_available.pop(host_id)
498 return None
499
500
501 def find_eligible_host(self, queue_entry):
502 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000503 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000504 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000505 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000506 return self._schedule_metahost(queue_entry)
507
508
showard89f84db2009-03-12 20:39:13 +0000509 def find_eligible_atomic_group(self, queue_entry):
510 """
511 Given an atomic group host queue entry, locate an appropriate group
512 of hosts for the associated job to run on.
513
514 The caller is responsible for creating new HQEs for the additional
515 hosts returned in order to run the actual job on them.
516
517 @returns A list of Host instances in a ready state to satisfy this
518 atomic group scheduling. Hosts will all belong to the same
519 atomic group label as specified by the queue_entry.
520 An empty list will be returned if no suitable atomic
521 group could be found.
522
523 TODO(gps): what is responsible for kicking off any attempted repairs on
524 a group of hosts? not this function, but something needs to. We do
525 not communicate that reason for returning [] outside of here...
526 For now, we'll just be unschedulable if enough hosts within one group
527 enter Repair Failed state.
528 """
529 assert queue_entry.atomic_group_id is not None
530 job = queue_entry.job
531 assert job.synch_count and job.synch_count > 0
532 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
533 if job.synch_count > atomic_group.max_number_of_machines:
534 # Such a Job and HostQueueEntry should never be possible to
535 # create using the frontend. Regardless, we can't process it.
536 # Abort it immediately and log an error on the scheduler.
537 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000538 logging.error(
539 'Error: job %d synch_count=%d > requested atomic_group %d '
540 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
541 job.id, job.synch_count, atomic_group.id,
542 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000543 return []
544 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
545 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
546 set())
547
548 # Look in each label associated with atomic_group until we find one with
549 # enough hosts to satisfy the job.
550 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
551 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
552 if queue_entry.meta_host is not None:
553 # If we have a metahost label, only allow its hosts.
554 group_hosts.intersection_update(hosts_in_label)
555 group_hosts -= ineligible_host_ids
556 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
557 group_hosts, queue_entry)
558
559 # Job.synch_count is treated as "minimum synch count" when
560 # scheduling for an atomic group of hosts. The atomic group
561 # number of machines is the maximum to pick out of a single
562 # atomic group label for scheduling at one time.
563 min_hosts = job.synch_count
564 max_hosts = atomic_group.max_number_of_machines
565
566 if len(eligible_hosts_in_group) < min_hosts:
567 # Not enough eligible hosts in this atomic group label.
568 continue
569
570 # Limit ourselves to scheduling the atomic group size.
571 if len(eligible_hosts_in_group) > max_hosts:
572 eligible_hosts_in_group = random.sample(
573 eligible_hosts_in_group, max_hosts)
574
575 # Remove the selected hosts from our cached internal state
576 # of available hosts in order to return the Host objects.
577 host_list = []
578 for host_id in eligible_hosts_in_group:
579 hosts_in_label.discard(host_id)
580 host_list.append(self._hosts_available.pop(host_id))
581 return host_list
582
583 return []
584
585
showard170873e2009-01-07 00:22:26 +0000586class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000587 def __init__(self):
588 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000589 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000590 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000591 user_cleanup_time = scheduler_config.config.clean_interval
592 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
593 _db, user_cleanup_time)
594 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000595 self._host_agents = {}
596 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000597
mbligh36768f02008-02-22 18:28:33 +0000598
showard915958d2009-04-22 21:00:58 +0000599 def initialize(self, recover_hosts=True):
600 self._periodic_cleanup.initialize()
601 self._24hr_upkeep.initialize()
602
jadmanski0afbb632008-06-06 21:10:57 +0000603 # always recover processes
604 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000605
jadmanski0afbb632008-06-06 21:10:57 +0000606 if recover_hosts:
607 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000608
609
jadmanski0afbb632008-06-06 21:10:57 +0000610 def tick(self):
showard170873e2009-01-07 00:22:26 +0000611 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000612 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000613 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000614 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000615 self._schedule_new_jobs()
616 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000617 _drone_manager.execute_actions()
618 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000619
showard97aed502008-11-04 02:01:24 +0000620
mblighf3294cc2009-04-08 21:17:38 +0000621 def _run_cleanup(self):
622 self._periodic_cleanup.run_cleanup_maybe()
623 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000624
mbligh36768f02008-02-22 18:28:33 +0000625
showard170873e2009-01-07 00:22:26 +0000626 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
627 for object_id in object_ids:
628 agent_dict.setdefault(object_id, set()).add(agent)
629
630
631 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
632 for object_id in object_ids:
633 assert object_id in agent_dict
634 agent_dict[object_id].remove(agent)
635
636
jadmanski0afbb632008-06-06 21:10:57 +0000637 def add_agent(self, agent):
638 self._agents.append(agent)
639 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000640 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
641 self._register_agent_for_ids(self._queue_entry_agents,
642 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000643
showard170873e2009-01-07 00:22:26 +0000644
645 def get_agents_for_entry(self, queue_entry):
646 """
647 Find agents corresponding to the specified queue_entry.
648 """
showardd3dc1992009-04-22 21:01:40 +0000649 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000650
651
652 def host_has_agent(self, host):
653 """
654 Determine if there is currently an Agent present using this host.
655 """
656 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def remove_agent(self, agent):
660 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000661 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
662 agent)
663 self._unregister_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000668 self._register_pidfiles()
669 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000670 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000671 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000672 self._reverify_remaining_hosts()
673 # reinitialize drones after killing orphaned processes, since they can
674 # leave around files when they die
675 _drone_manager.execute_actions()
676 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000677
showard170873e2009-01-07 00:22:26 +0000678
679 def _register_pidfiles(self):
680 # during recovery we may need to read pidfiles for both running and
681 # parsing entries
682 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000683 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000684 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000685 for pidfile_name in _ALL_PIDFILE_NAMES:
686 pidfile_id = _drone_manager.get_pidfile_id_from(
687 queue_entry.execution_tag(), pidfile_name=pidfile_name)
688 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000689
690
showardd3dc1992009-04-22 21:01:40 +0000691 def _recover_entries_with_status(self, status, orphans, pidfile_name,
692 recover_entries_fn):
693 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000694 for queue_entry in queue_entries:
695 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000696 # synchronous job we've already recovered
697 continue
showardd3dc1992009-04-22 21:01:40 +0000698 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000699 execution_tag = queue_entry.execution_tag()
700 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000701 run_monitor.attach_to_existing_process(execution_tag,
702 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000703 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000704 # execution apparently never happened
705 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000706 continue
mbligh90a549d2008-03-25 23:52:34 +0000707
showardd3dc1992009-04-22 21:01:40 +0000708 logging.info('Recovering %s entry %s (process %s)',
709 status.lower(),
710 ', '.join(str(entry) for entry in queue_entries),
711 run_monitor.get_process())
712 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
713 orphans.discard(run_monitor.get_process())
714
715
716 def _kill_remaining_orphan_processes(self, orphans):
717 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000718 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000719 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000720
showard170873e2009-01-07 00:22:26 +0000721
showardd3dc1992009-04-22 21:01:40 +0000722 def _recover_running_entries(self, orphans):
723 def recover_entries(job, queue_entries, run_monitor):
724 if run_monitor is not None:
725 queue_task = RecoveryQueueTask(job=job,
726 queue_entries=queue_entries,
727 run_monitor=run_monitor)
728 self.add_agent(Agent(tasks=[queue_task],
729 num_processes=len(queue_entries)))
730 # else, _requeue_other_active_entries will cover this
731
732 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
733 orphans, '.autoserv_execute',
734 recover_entries)
735
736
737 def _recover_gathering_entries(self, orphans):
738 def recover_entries(job, queue_entries, run_monitor):
739 gather_task = GatherLogsTask(job, queue_entries,
740 run_monitor=run_monitor)
741 self.add_agent(Agent([gather_task]))
742
743 self._recover_entries_with_status(
744 models.HostQueueEntry.Status.GATHERING,
745 orphans, _CRASHINFO_PID_FILE, recover_entries)
746
747
748 def _recover_parsing_entries(self, orphans):
749 def recover_entries(job, queue_entries, run_monitor):
750 reparse_task = FinalReparseTask(queue_entries,
751 run_monitor=run_monitor)
752 self.add_agent(Agent([reparse_task], num_processes=0))
753
754 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
755 orphans, _PARSER_PID_FILE,
756 recover_entries)
757
758
759 def _recover_all_recoverable_entries(self):
760 orphans = _drone_manager.get_orphaned_autoserv_processes()
761 self._recover_running_entries(orphans)
762 self._recover_gathering_entries(orphans)
763 self._recover_parsing_entries(orphans)
764 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000765
showard97aed502008-11-04 02:01:24 +0000766
showard170873e2009-01-07 00:22:26 +0000767 def _requeue_other_active_entries(self):
768 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000769 where='active AND NOT complete AND '
770 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000771 for queue_entry in queue_entries:
772 if self.get_agents_for_entry(queue_entry):
773 # entry has already been recovered
774 continue
showardd3dc1992009-04-22 21:01:40 +0000775 if queue_entry.aborted:
776 queue_entry.abort(self)
777 continue
778
779 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000780 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000781 if queue_entry.host:
782 tasks = queue_entry.host.reverify_tasks()
783 self.add_agent(Agent(tasks))
784 agent = queue_entry.requeue()
785
786
787 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000788 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000789 self._reverify_hosts_where("""(status = 'Repairing' OR
790 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000791 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000792
showard170873e2009-01-07 00:22:26 +0000793 # recover "Running" hosts with no active queue entries, although this
794 # should never happen
795 message = ('Recovering running host %s - this probably indicates a '
796 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000797 self._reverify_hosts_where("""status = 'Running' AND
798 id NOT IN (SELECT host_id
799 FROM host_queue_entries
800 WHERE active)""",
801 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000802
803
jadmanski0afbb632008-06-06 21:10:57 +0000804 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000805 print_message='Reverifying host %s'):
806 full_where='locked = 0 AND invalid = 0 AND ' + where
807 for host in Host.fetch(where=full_where):
808 if self.host_has_agent(host):
809 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000810 continue
showard170873e2009-01-07 00:22:26 +0000811 if print_message:
showardb18134f2009-03-20 20:52:18 +0000812 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000813 tasks = host.reverify_tasks()
814 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000815
816
jadmanski0afbb632008-06-06 21:10:57 +0000817 def _recover_hosts(self):
818 # recover "Repair Failed" hosts
819 message = 'Reverifying dead host %s'
820 self._reverify_hosts_where("status = 'Repair Failed'",
821 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000822
823
showard04c82c52008-05-29 19:38:12 +0000824
showardb95b1bd2008-08-15 18:11:04 +0000825 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000826 # prioritize by job priority, then non-metahost over metahost, then FIFO
827 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000828 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000829 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000830 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000831
832
showard89f84db2009-03-12 20:39:13 +0000833 def _refresh_pending_queue_entries(self):
834 """
835 Lookup the pending HostQueueEntries and call our HostScheduler
836 refresh() method given that list. Return the list.
837
838 @returns A list of pending HostQueueEntries sorted in priority order.
839 """
showard63a34772008-08-18 19:32:50 +0000840 queue_entries = self._get_pending_queue_entries()
841 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000842 return []
showardb95b1bd2008-08-15 18:11:04 +0000843
showard63a34772008-08-18 19:32:50 +0000844 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000845
showard89f84db2009-03-12 20:39:13 +0000846 return queue_entries
847
848
849 def _schedule_atomic_group(self, queue_entry):
850 """
851 Schedule the given queue_entry on an atomic group of hosts.
852
853 Returns immediately if there are insufficient available hosts.
854
855 Creates new HostQueueEntries based off of queue_entry for the
856 scheduled hosts and starts them all running.
857 """
858 # This is a virtual host queue entry representing an entire
859 # atomic group, find a group and schedule their hosts.
860 group_hosts = self._host_scheduler.find_eligible_atomic_group(
861 queue_entry)
862 if not group_hosts:
863 return
864 # The first assigned host uses the original HostQueueEntry
865 group_queue_entries = [queue_entry]
866 for assigned_host in group_hosts[1:]:
867 # Create a new HQE for every additional assigned_host.
868 new_hqe = HostQueueEntry.clone(queue_entry)
869 new_hqe.save()
870 group_queue_entries.append(new_hqe)
871 assert len(group_queue_entries) == len(group_hosts)
872 for queue_entry, host in itertools.izip(group_queue_entries,
873 group_hosts):
874 self._run_queue_entry(queue_entry, host)
875
876
877 def _schedule_new_jobs(self):
878 queue_entries = self._refresh_pending_queue_entries()
879 if not queue_entries:
880 return
881
showard63a34772008-08-18 19:32:50 +0000882 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000883 if (queue_entry.atomic_group_id is None or
884 queue_entry.host_id is not None):
885 assigned_host = self._host_scheduler.find_eligible_host(
886 queue_entry)
887 if assigned_host:
888 self._run_queue_entry(queue_entry, assigned_host)
889 else:
890 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000891
892
893 def _run_queue_entry(self, queue_entry, host):
894 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000895 # in some cases (synchronous jobs with run_verify=False), agent may be
896 # None
showard9976ce92008-10-15 20:28:13 +0000897 if agent:
898 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000899
900
jadmanski0afbb632008-06-06 21:10:57 +0000901 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000902 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
903 for agent in self.get_agents_for_entry(entry):
904 agent.abort()
905 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000906
907
showard324bf812009-01-20 23:23:38 +0000908 def _can_start_agent(self, agent, num_started_this_cycle,
909 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000910 # always allow zero-process agents to run
911 if agent.num_processes == 0:
912 return True
913 # don't allow any nonzero-process agents to run after we've reached a
914 # limit (this avoids starvation of many-process agents)
915 if have_reached_limit:
916 return False
917 # total process throttling
showard324bf812009-01-20 23:23:38 +0000918 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000919 return False
920 # if a single agent exceeds the per-cycle throttling, still allow it to
921 # run when it's the first agent in the cycle
922 if num_started_this_cycle == 0:
923 return True
924 # per-cycle throttling
925 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000926 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000927 return False
928 return True
929
930
jadmanski0afbb632008-06-06 21:10:57 +0000931 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000932 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000933 have_reached_limit = False
934 # iterate over copy, so we can remove agents during iteration
935 for agent in list(self._agents):
936 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000937 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000938 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000939 continue
940 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000941 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000942 have_reached_limit):
943 have_reached_limit = True
944 continue
showard4c5374f2008-09-04 17:02:56 +0000945 num_started_this_cycle += agent.num_processes
946 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000947 logging.info('%d running processes',
948 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000949
950
showard29f7cd22009-04-29 21:16:24 +0000951 def _process_recurring_runs(self):
952 recurring_runs = models.RecurringRun.objects.filter(
953 start_date__lte=datetime.datetime.now())
954 for rrun in recurring_runs:
955 # Create job from template
956 job = rrun.job
957 info = rpc_utils.get_job_info(job)
958
959 host_objects = info['hosts']
960 one_time_hosts = info['one_time_hosts']
961 metahost_objects = info['meta_hosts']
962 dependencies = info['dependencies']
963 atomic_group = info['atomic_group']
964
965 for host in one_time_hosts or []:
966 this_host = models.Host.create_one_time_host(host.hostname)
967 host_objects.append(this_host)
968
969 try:
970 rpc_utils.create_new_job(owner=rrun.owner.login,
971 host_objects=host_objects,
972 metahost_objects=metahost_objects,
973 name=job.name,
974 priority=job.priority,
975 control_file=job.control_file,
976 control_type=job.control_type,
977 is_template=False,
978 synch_count=job.synch_count,
979 timeout=job.timeout,
980 run_verify=job.run_verify,
981 email_list=job.email_list,
982 dependencies=dependencies,
983 reboot_before=job.reboot_before,
984 reboot_after=job.reboot_after,
985 atomic_group=atomic_group)
986
987 except Exception, ex:
988 logging.exception(ex)
989 #TODO send email
990
991 if rrun.loop_count == 1:
992 rrun.delete()
993 else:
994 if rrun.loop_count != 0: # if not infinite loop
995 # calculate new start_date
996 difference = datetime.timedelta(seconds=rrun.loop_period)
997 rrun.start_date = rrun.start_date + difference
998 rrun.loop_count -= 1
999 rrun.save()
1000
1001
showard170873e2009-01-07 00:22:26 +00001002class PidfileRunMonitor(object):
1003 """
1004 Client must call either run() to start a new process or
1005 attach_to_existing_process().
1006 """
mbligh36768f02008-02-22 18:28:33 +00001007
showard170873e2009-01-07 00:22:26 +00001008 class _PidfileException(Exception):
1009 """
1010 Raised when there's some unexpected behavior with the pid file, but only
1011 used internally (never allowed to escape this class).
1012 """
mbligh36768f02008-02-22 18:28:33 +00001013
1014
showard170873e2009-01-07 00:22:26 +00001015 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001016 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001017 self._start_time = None
1018 self.pidfile_id = None
1019 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001020
1021
showard170873e2009-01-07 00:22:26 +00001022 def _add_nice_command(self, command, nice_level):
1023 if not nice_level:
1024 return command
1025 return ['nice', '-n', str(nice_level)] + command
1026
1027
1028 def _set_start_time(self):
1029 self._start_time = time.time()
1030
1031
1032 def run(self, command, working_directory, nice_level=None, log_file=None,
1033 pidfile_name=None, paired_with_pidfile=None):
1034 assert command is not None
1035 if nice_level is not None:
1036 command = ['nice', '-n', str(nice_level)] + command
1037 self._set_start_time()
1038 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001039 command, working_directory, pidfile_name=pidfile_name,
1040 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001041
1042
showardd3dc1992009-04-22 21:01:40 +00001043 def attach_to_existing_process(self, execution_tag,
1044 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001045 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001046 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1047 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001048 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001049
1050
jadmanski0afbb632008-06-06 21:10:57 +00001051 def kill(self):
showard170873e2009-01-07 00:22:26 +00001052 if self.has_process():
1053 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001054
mbligh36768f02008-02-22 18:28:33 +00001055
showard170873e2009-01-07 00:22:26 +00001056 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001057 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001058 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001059
1060
showard170873e2009-01-07 00:22:26 +00001061 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001062 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001063 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001064 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001065
1066
showard170873e2009-01-07 00:22:26 +00001067 def _read_pidfile(self, use_second_read=False):
1068 assert self.pidfile_id is not None, (
1069 'You must call run() or attach_to_existing_process()')
1070 contents = _drone_manager.get_pidfile_contents(
1071 self.pidfile_id, use_second_read=use_second_read)
1072 if contents.is_invalid():
1073 self._state = drone_manager.PidfileContents()
1074 raise self._PidfileException(contents)
1075 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001076
1077
showard21baa452008-10-21 00:08:39 +00001078 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001079 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1080 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001081 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001082 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001083 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001084
1085
1086 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001087 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001088 return
mblighbb421852008-03-11 22:36:16 +00001089
showard21baa452008-10-21 00:08:39 +00001090 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001091
showard170873e2009-01-07 00:22:26 +00001092 if self._state.process is None:
1093 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001094 return
mbligh90a549d2008-03-25 23:52:34 +00001095
showard21baa452008-10-21 00:08:39 +00001096 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001097 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001098 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001099 return
mbligh90a549d2008-03-25 23:52:34 +00001100
showard170873e2009-01-07 00:22:26 +00001101 # pid but no running process - maybe process *just* exited
1102 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001103 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001104 # autoserv exited without writing an exit code
1105 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001106 self._handle_pidfile_error(
1107 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001108
showard21baa452008-10-21 00:08:39 +00001109
1110 def _get_pidfile_info(self):
1111 """\
1112 After completion, self._state will contain:
1113 pid=None, exit_status=None if autoserv has not yet run
1114 pid!=None, exit_status=None if autoserv is running
1115 pid!=None, exit_status!=None if autoserv has completed
1116 """
1117 try:
1118 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001119 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001120 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001121
1122
showard170873e2009-01-07 00:22:26 +00001123 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001124 """\
1125 Called when no pidfile is found or no pid is in the pidfile.
1126 """
showard170873e2009-01-07 00:22:26 +00001127 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001128 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001129 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1130 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001131 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001132 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001133
1134
showard35162b02009-03-03 02:17:30 +00001135 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001136 """\
1137 Called when autoserv has exited without writing an exit status,
1138 or we've timed out waiting for autoserv to write a pid to the
1139 pidfile. In either case, we just return failure and the caller
1140 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001141
showard170873e2009-01-07 00:22:26 +00001142 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001143 """
1144 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001145 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001146 self._state.exit_status = 1
1147 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001151 self._get_pidfile_info()
1152 return self._state.exit_status
1153
1154
1155 def num_tests_failed(self):
1156 self._get_pidfile_info()
1157 assert self._state.num_tests_failed is not None
1158 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001159
1160
mbligh36768f02008-02-22 18:28:33 +00001161class Agent(object):
showard170873e2009-01-07 00:22:26 +00001162 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001163 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001164 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001165 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001166 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001167
showard170873e2009-01-07 00:22:26 +00001168 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1169 for task in tasks)
1170 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1171
showardd3dc1992009-04-22 21:01:40 +00001172 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001173 for task in tasks:
1174 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001175
1176
showardd3dc1992009-04-22 21:01:40 +00001177 def _clear_queue(self):
1178 self.queue = Queue.Queue(0)
1179
1180
showard170873e2009-01-07 00:22:26 +00001181 def _union_ids(self, id_lists):
1182 return set(itertools.chain(*id_lists))
1183
1184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def add_task(self, task):
1186 self.queue.put_nowait(task)
1187 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001188
1189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def tick(self):
showard21baa452008-10-21 00:08:39 +00001191 while not self.is_done():
1192 if self.active_task and not self.active_task.is_done():
1193 self.active_task.poll()
1194 if not self.active_task.is_done():
1195 return
1196 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001197
1198
jadmanski0afbb632008-06-06 21:10:57 +00001199 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001200 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001201 if self.active_task:
1202 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001203 if not self.active_task.success:
1204 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001205
jadmanski0afbb632008-06-06 21:10:57 +00001206 self.active_task = None
1207 if not self.is_done():
1208 self.active_task = self.queue.get_nowait()
1209 if self.active_task:
1210 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001211
1212
jadmanski0afbb632008-06-06 21:10:57 +00001213 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001214 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001215 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1216 # get reset.
1217 new_agent = Agent(self.active_task.failure_tasks)
1218 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001219
mblighe2586682008-02-29 22:45:46 +00001220
showard4c5374f2008-09-04 17:02:56 +00001221 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001222 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001223
1224
jadmanski0afbb632008-06-06 21:10:57 +00001225 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001226 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001227
1228
jadmanski0afbb632008-06-06 21:10:57 +00001229 def start(self):
1230 assert self.dispatcher
jadmanski0afbb632008-06-06 21:10:57 +00001231 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001232
jadmanski0afbb632008-06-06 21:10:57 +00001233
showardd3dc1992009-04-22 21:01:40 +00001234 def abort(self):
1235 if self.active_task:
1236 self.active_task.abort()
showard20f9bdd2009-04-29 19:48:33 +00001237 if not self.active_task.aborted: # tasks can choose to ignore aborts
1238 return
1239 self.active_task = None
1240
1241 self._clear_queue()
1242
showardd3dc1992009-04-22 21:01:40 +00001243
1244
mbligh36768f02008-02-22 18:28:33 +00001245class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001246 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1247 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001248 self.done = False
1249 self.failure_tasks = failure_tasks
1250 self.started = False
1251 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001252 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001253 self.task = None
1254 self.agent = None
1255 self.monitor = None
1256 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001257 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001258 self.queue_entry_ids = []
1259 self.host_ids = []
1260 self.log_file = None
1261
1262
1263 def _set_ids(self, host=None, queue_entries=None):
1264 if queue_entries and queue_entries != [None]:
1265 self.host_ids = [entry.host.id for entry in queue_entries]
1266 self.queue_entry_ids = [entry.id for entry in queue_entries]
1267 else:
1268 assert host
1269 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001270
1271
jadmanski0afbb632008-06-06 21:10:57 +00001272 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001273 if self.monitor:
1274 self.tick(self.monitor.exit_code())
1275 else:
1276 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001277
1278
jadmanski0afbb632008-06-06 21:10:57 +00001279 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001280 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001281 return
jadmanski0afbb632008-06-06 21:10:57 +00001282 if exit_code == 0:
1283 success = True
1284 else:
1285 success = False
mbligh36768f02008-02-22 18:28:33 +00001286
jadmanski0afbb632008-06-06 21:10:57 +00001287 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001288
1289
jadmanski0afbb632008-06-06 21:10:57 +00001290 def is_done(self):
1291 return self.done
mbligh36768f02008-02-22 18:28:33 +00001292
1293
jadmanski0afbb632008-06-06 21:10:57 +00001294 def finished(self, success):
1295 self.done = True
1296 self.success = success
1297 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001298
1299
jadmanski0afbb632008-06-06 21:10:57 +00001300 def prolog(self):
1301 pass
mblighd64e5702008-04-04 21:39:28 +00001302
1303
jadmanski0afbb632008-06-06 21:10:57 +00001304 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001305 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001306
mbligh36768f02008-02-22 18:28:33 +00001307
jadmanski0afbb632008-06-06 21:10:57 +00001308 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001309 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001310 _drone_manager.copy_to_results_repository(
1311 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001312
1313
jadmanski0afbb632008-06-06 21:10:57 +00001314 def epilog(self):
1315 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001316
1317
jadmanski0afbb632008-06-06 21:10:57 +00001318 def start(self):
1319 assert self.agent
1320
1321 if not self.started:
1322 self.prolog()
1323 self.run()
1324
1325 self.started = True
1326
1327
1328 def abort(self):
1329 if self.monitor:
1330 self.monitor.kill()
1331 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001332 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001333 self.cleanup()
1334
1335
showard170873e2009-01-07 00:22:26 +00001336 def set_host_log_file(self, base_name, host):
1337 filename = '%s.%s' % (time.time(), base_name)
1338 self.log_file = os.path.join('hosts', host.hostname, filename)
1339
1340
showardde634ee2009-01-30 01:44:24 +00001341 def _get_consistent_execution_tag(self, queue_entries):
1342 first_execution_tag = queue_entries[0].execution_tag()
1343 for queue_entry in queue_entries[1:]:
1344 assert queue_entry.execution_tag() == first_execution_tag, (
1345 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1346 queue_entry,
1347 first_execution_tag,
1348 queue_entries[0]))
1349 return first_execution_tag
1350
1351
showard6b733412009-04-27 20:09:18 +00001352 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001353 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001354 if use_monitor is None:
1355 assert self.monitor
1356 use_monitor = self.monitor
1357 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001358 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001359 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001360 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001361 results_path)
showardde634ee2009-01-30 01:44:24 +00001362
1363 reparse_task = FinalReparseTask(queue_entries)
1364 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1365
1366
showardd3dc1992009-04-22 21:01:40 +00001367 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001368 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001369 self.monitor = PidfileRunMonitor()
1370 self.monitor.run(self.cmd, self._working_directory,
1371 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001372 log_file=self.log_file,
1373 pidfile_name=pidfile_name,
1374 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001375
1376
showardd9205182009-04-27 20:09:55 +00001377class TaskWithJobKeyvals(object):
1378 """AgentTask mixin providing functionality to help with job keyval files."""
1379 _KEYVAL_FILE = 'keyval'
1380 def _format_keyval(self, key, value):
1381 return '%s=%s' % (key, value)
1382
1383
1384 def _keyval_path(self):
1385 """Subclasses must override this"""
1386 raise NotImplemented
1387
1388
1389 def _write_keyval_after_job(self, field, value):
1390 assert self.monitor
1391 if not self.monitor.has_process():
1392 return
1393 _drone_manager.write_lines_to_file(
1394 self._keyval_path(), [self._format_keyval(field, value)],
1395 paired_with_process=self.monitor.get_process())
1396
1397
1398 def _job_queued_keyval(self, job):
1399 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1400
1401
1402 def _write_job_finished(self):
1403 self._write_keyval_after_job("job_finished", int(time.time()))
1404
1405
1406class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001407 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001408 """\
showard170873e2009-01-07 00:22:26 +00001409 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001410 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001411 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001412 # normalize the protection name
1413 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001414
jadmanski0afbb632008-06-06 21:10:57 +00001415 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001416 self.queue_entry_to_fail = queue_entry
1417 # *don't* include the queue entry in IDs -- if the queue entry is
1418 # aborted, we want to leave the repair task running
1419 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001420
1421 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001422 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1423 ['-R', '--host-protection', protection],
1424 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001425 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1426
showard170873e2009-01-07 00:22:26 +00001427 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001428
mbligh36768f02008-02-22 18:28:33 +00001429
jadmanski0afbb632008-06-06 21:10:57 +00001430 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001431 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001432 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001433 if self.queue_entry_to_fail:
1434 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001435
1436
showardd9205182009-04-27 20:09:55 +00001437 def _keyval_path(self):
1438 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1439
1440
showardde634ee2009-01-30 01:44:24 +00001441 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001442 assert self.queue_entry_to_fail
1443
1444 if self.queue_entry_to_fail.meta_host:
1445 return # don't fail metahost entries, they'll be reassigned
1446
1447 self.queue_entry_to_fail.update_from_database()
1448 if self.queue_entry_to_fail.status != 'Queued':
1449 return # entry has been aborted
1450
1451 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001452 queued_key, queued_time = self._job_queued_keyval(
1453 self.queue_entry_to_fail.job)
1454 self._write_keyval_after_job(queued_key, queued_time)
1455 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001456 # copy results logs into the normal place for job results
1457 _drone_manager.copy_results_on_drone(
1458 self.monitor.get_process(),
1459 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001460 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001461
showardccbd6c52009-03-21 00:10:21 +00001462 self._copy_and_parse_results([self.queue_entry_to_fail])
1463 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001464
1465
jadmanski0afbb632008-06-06 21:10:57 +00001466 def epilog(self):
1467 super(RepairTask, self).epilog()
1468 if self.success:
1469 self.host.set_status('Ready')
1470 else:
1471 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001472 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001473 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001474
1475
showard8fe93b52008-11-18 17:53:22 +00001476class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001477 def epilog(self):
1478 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001479 should_copy_results = (self.queue_entry and not self.success
1480 and not self.queue_entry.meta_host)
1481 if should_copy_results:
1482 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001483 destination = os.path.join(self.queue_entry.execution_tag(),
1484 os.path.basename(self.log_file))
1485 _drone_manager.copy_to_results_repository(
1486 self.monitor.get_process(), self.log_file,
1487 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001488
1489
1490class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001491 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001492 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001493 self.host = host or queue_entry.host
1494 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001495
jadmanski0afbb632008-06-06 21:10:57 +00001496 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001497 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1498 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001499 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001500 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1501 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001502
showard170873e2009-01-07 00:22:26 +00001503 self.set_host_log_file('verify', self.host)
1504 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001505
1506
jadmanski0afbb632008-06-06 21:10:57 +00001507 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001508 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001509 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001510 if self.queue_entry:
1511 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001512 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001513
1514
jadmanski0afbb632008-06-06 21:10:57 +00001515 def epilog(self):
1516 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001517
jadmanski0afbb632008-06-06 21:10:57 +00001518 if self.success:
1519 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001520
1521
showardd9205182009-04-27 20:09:55 +00001522class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001523 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001524 self.job = job
1525 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001526 super(QueueTask, self).__init__(cmd, self._execution_tag())
1527 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001528
1529
showard73ec0442009-02-07 02:05:20 +00001530 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001531 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001532
1533
1534 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1535 keyval_contents = '\n'.join(self._format_keyval(key, value)
1536 for key, value in keyval_dict.iteritems())
1537 # always end with a newline to allow additional keyvals to be written
1538 keyval_contents += '\n'
1539 _drone_manager.attach_file_to_execution(self._execution_tag(),
1540 keyval_contents,
1541 file_path=keyval_path)
1542
1543
1544 def _write_keyvals_before_job(self, keyval_dict):
1545 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1546
1547
showard170873e2009-01-07 00:22:26 +00001548 def _write_host_keyvals(self, host):
1549 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1550 host.hostname)
1551 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001552 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1553 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001554
1555
showard170873e2009-01-07 00:22:26 +00001556 def _execution_tag(self):
1557 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001558
1559
jadmanski0afbb632008-06-06 21:10:57 +00001560 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001561 queued_key, queued_time = self._job_queued_keyval(self.job)
1562 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001563 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001564 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001565 queue_entry.set_status('Running')
1566 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001567 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001568 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001569 assert len(self.queue_entries) == 1
1570 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001571
1572
showard35162b02009-03-03 02:17:30 +00001573 def _write_lost_process_error_file(self):
1574 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1575 _drone_manager.write_lines_to_file(error_file_path,
1576 [_LOST_PROCESS_ERROR])
1577
1578
showardd3dc1992009-04-22 21:01:40 +00001579 def _finish_task(self):
showardd9205182009-04-27 20:09:55 +00001580 self._write_job_finished()
1581
showardd3dc1992009-04-22 21:01:40 +00001582 # both of these conditionals can be true, iff the process ran, wrote a
1583 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001584 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001585 gather_task = GatherLogsTask(self.job, self.queue_entries)
1586 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001587
1588 if self.monitor.lost_process:
1589 self._write_lost_process_error_file()
1590 for queue_entry in self.queue_entries:
1591 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001592
1593
showardcbd74612008-11-19 21:42:02 +00001594 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001595 _drone_manager.write_lines_to_file(
1596 os.path.join(self._execution_tag(), 'status.log'),
1597 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001598 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001599
1600
jadmanskif7fa2cc2008-10-01 14:13:23 +00001601 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001602 if not self.monitor or not self.monitor.has_process():
1603 return
1604
jadmanskif7fa2cc2008-10-01 14:13:23 +00001605 # build up sets of all the aborted_by and aborted_on values
1606 aborted_by, aborted_on = set(), set()
1607 for queue_entry in self.queue_entries:
1608 if queue_entry.aborted_by:
1609 aborted_by.add(queue_entry.aborted_by)
1610 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1611 aborted_on.add(t)
1612
1613 # extract some actual, unique aborted by value and write it out
1614 assert len(aborted_by) <= 1
1615 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001616 aborted_by_value = aborted_by.pop()
1617 aborted_on_value = max(aborted_on)
1618 else:
1619 aborted_by_value = 'autotest_system'
1620 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001621
showarda0382352009-02-11 23:36:43 +00001622 self._write_keyval_after_job("aborted_by", aborted_by_value)
1623 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001624
showardcbd74612008-11-19 21:42:02 +00001625 aborted_on_string = str(datetime.datetime.fromtimestamp(
1626 aborted_on_value))
1627 self._write_status_comment('Job aborted by %s on %s' %
1628 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001629
1630
jadmanski0afbb632008-06-06 21:10:57 +00001631 def abort(self):
1632 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001633 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001634 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001635
1636
jadmanski0afbb632008-06-06 21:10:57 +00001637 def epilog(self):
1638 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001639 self._finish_task()
1640 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001641
1642
mblighbb421852008-03-11 22:36:16 +00001643class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001644 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001645 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001646 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001647
1648
jadmanski0afbb632008-06-06 21:10:57 +00001649 def run(self):
1650 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001651
1652
jadmanski0afbb632008-06-06 21:10:57 +00001653 def prolog(self):
1654 # recovering an existing process - don't do prolog
1655 pass
mblighbb421852008-03-11 22:36:16 +00001656
1657
showardd3dc1992009-04-22 21:01:40 +00001658class PostJobTask(AgentTask):
1659 def __init__(self, queue_entries, pidfile_name, logfile_name,
1660 run_monitor=None):
1661 """
1662 If run_monitor != None, we're recovering a running task.
1663 """
1664 self._queue_entries = queue_entries
1665 self._pidfile_name = pidfile_name
1666 self._run_monitor = run_monitor
1667
1668 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1669 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1670 self._autoserv_monitor = PidfileRunMonitor()
1671 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1672 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1673
1674 if _testing_mode:
1675 command = 'true'
1676 else:
1677 command = self._generate_command(self._results_dir)
1678
1679 super(PostJobTask, self).__init__(cmd=command,
1680 working_directory=self._execution_tag)
1681
1682 self.log_file = os.path.join(self._execution_tag, logfile_name)
1683 self._final_status = self._determine_final_status()
1684
1685
1686 def _generate_command(self, results_dir):
1687 raise NotImplementedError('Subclasses must override this')
1688
1689
1690 def _job_was_aborted(self):
1691 was_aborted = None
1692 for queue_entry in self._queue_entries:
1693 queue_entry.update_from_database()
1694 if was_aborted is None: # first queue entry
1695 was_aborted = bool(queue_entry.aborted)
1696 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1697 email_manager.manager.enqueue_notify_email(
1698 'Inconsistent abort state',
1699 'Queue entries have inconsistent abort state: ' +
1700 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1701 # don't crash here, just assume true
1702 return True
1703 return was_aborted
1704
1705
1706 def _determine_final_status(self):
1707 if self._job_was_aborted():
1708 return models.HostQueueEntry.Status.ABORTED
1709
1710 # we'll use a PidfileRunMonitor to read the autoserv exit status
1711 if self._autoserv_monitor.exit_code() == 0:
1712 return models.HostQueueEntry.Status.COMPLETED
1713 return models.HostQueueEntry.Status.FAILED
1714
1715
1716 def run(self):
1717 if self._run_monitor is not None:
1718 self.monitor = self._run_monitor
1719 else:
1720 # make sure we actually have results to work with.
1721 # this should never happen in normal operation.
1722 if not self._autoserv_monitor.has_process():
1723 email_manager.manager.enqueue_notify_email(
1724 'No results in post-job task',
1725 'No results in post-job task at %s' %
1726 self._autoserv_monitor.pidfile_id)
1727 self.finished(False)
1728 return
1729
1730 super(PostJobTask, self).run(
1731 pidfile_name=self._pidfile_name,
1732 paired_with_pidfile=self._paired_with_pidfile)
1733
1734
1735 def _set_all_statuses(self, status):
1736 for queue_entry in self._queue_entries:
1737 queue_entry.set_status(status)
1738
1739
1740 def abort(self):
1741 # override AgentTask.abort() to avoid killing the process and ending
1742 # the task. post-job tasks continue when the job is aborted.
1743 pass
1744
1745
1746class GatherLogsTask(PostJobTask):
1747 """
1748 Task responsible for
1749 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1750 * copying logs to the results repository
1751 * spawning CleanupTasks for hosts, if necessary
1752 * spawning a FinalReparseTask for the job
1753 """
1754 def __init__(self, job, queue_entries, run_monitor=None):
1755 self._job = job
1756 super(GatherLogsTask, self).__init__(
1757 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1758 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1759 self._set_ids(queue_entries=queue_entries)
1760
1761
1762 def _generate_command(self, results_dir):
1763 host_list = ','.join(queue_entry.host.hostname
1764 for queue_entry in self._queue_entries)
1765 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1766 '-r', results_dir]
1767
1768
1769 def prolog(self):
1770 super(GatherLogsTask, self).prolog()
1771 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1772
1773
1774 def _reboot_hosts(self):
1775 reboot_after = self._job.reboot_after
1776 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001777 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1778 do_reboot = True
1779 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001780 do_reboot = True
1781 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1782 final_success = (
1783 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1784 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1785 do_reboot = (final_success and num_tests_failed == 0)
1786
1787 for queue_entry in self._queue_entries:
1788 if do_reboot:
1789 # don't pass the queue entry to the CleanupTask. if the cleanup
1790 # fails, the job doesn't care -- it's over.
1791 cleanup_task = CleanupTask(host=queue_entry.host)
1792 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1793 else:
1794 queue_entry.host.set_status('Ready')
1795
1796
1797 def epilog(self):
1798 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001799 self._copy_and_parse_results(self._queue_entries,
1800 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001801 self._reboot_hosts()
1802
1803
showard0bbfc212009-04-29 21:06:13 +00001804 def run(self):
1805 if self._final_status == models.HostQueueEntry.Status.COMPLETED:
1806 # don't run at all if Autoserv exited successfully
1807 self.finished(True)
1808 else:
1809 super(GatherLogsTask, self).run()
1810
1811
showard8fe93b52008-11-18 17:53:22 +00001812class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001813 def __init__(self, host=None, queue_entry=None):
1814 assert bool(host) ^ bool(queue_entry)
1815 if queue_entry:
1816 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001817 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001818 self.host = host
showard170873e2009-01-07 00:22:26 +00001819
1820 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001821 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1822 ['--cleanup'],
1823 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001824 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001825 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1826 failure_tasks=[repair_task])
1827
1828 self._set_ids(host=host, queue_entries=[queue_entry])
1829 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001830
mblighd5c95802008-03-05 00:33:46 +00001831
jadmanski0afbb632008-06-06 21:10:57 +00001832 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001833 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001834 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001835 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001836
mblighd5c95802008-03-05 00:33:46 +00001837
showard21baa452008-10-21 00:08:39 +00001838 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001839 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001840 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001841 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001842 self.host.update_field('dirty', 0)
1843
1844
showardd3dc1992009-04-22 21:01:40 +00001845class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001846 _num_running_parses = 0
1847
showardd3dc1992009-04-22 21:01:40 +00001848 def __init__(self, queue_entries, run_monitor=None):
1849 super(FinalReparseTask, self).__init__(queue_entries,
1850 pidfile_name=_PARSER_PID_FILE,
1851 logfile_name='.parse.log',
1852 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001853 # don't use _set_ids, since we don't want to set the host_ids
1854 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001855 self._parse_started = False
1856
showard97aed502008-11-04 02:01:24 +00001857
1858 @classmethod
1859 def _increment_running_parses(cls):
1860 cls._num_running_parses += 1
1861
1862
1863 @classmethod
1864 def _decrement_running_parses(cls):
1865 cls._num_running_parses -= 1
1866
1867
1868 @classmethod
1869 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001870 return (cls._num_running_parses <
1871 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001872
1873
1874 def prolog(self):
1875 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001876 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001877
1878
1879 def epilog(self):
1880 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001881 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001882
1883
showardd3dc1992009-04-22 21:01:40 +00001884 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001885 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001886 results_dir]
showard97aed502008-11-04 02:01:24 +00001887
1888
1889 def poll(self):
1890 # override poll to keep trying to start until the parse count goes down
1891 # and we can, at which point we revert to default behavior
1892 if self._parse_started:
1893 super(FinalReparseTask, self).poll()
1894 else:
1895 self._try_starting_parse()
1896
1897
1898 def run(self):
1899 # override run() to not actually run unless we can
1900 self._try_starting_parse()
1901
1902
1903 def _try_starting_parse(self):
1904 if not self._can_run_new_parse():
1905 return
showard170873e2009-01-07 00:22:26 +00001906
showard97aed502008-11-04 02:01:24 +00001907 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001908 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001909
showard97aed502008-11-04 02:01:24 +00001910 self._increment_running_parses()
1911 self._parse_started = True
1912
1913
1914 def finished(self, success):
1915 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001916 if self._parse_started:
1917 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001918
1919
showardc9ae1782009-01-30 01:42:37 +00001920class SetEntryPendingTask(AgentTask):
1921 def __init__(self, queue_entry):
1922 super(SetEntryPendingTask, self).__init__(cmd='')
1923 self._queue_entry = queue_entry
1924 self._set_ids(queue_entries=[queue_entry])
1925
1926
1927 def run(self):
1928 agent = self._queue_entry.on_pending()
1929 if agent:
1930 self.agent.dispatcher.add_agent(agent)
1931 self.finished(True)
1932
1933
showarda3c58572009-03-12 20:36:59 +00001934class DBError(Exception):
1935 """Raised by the DBObject constructor when its select fails."""
1936
1937
mbligh36768f02008-02-22 18:28:33 +00001938class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001939 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001940
1941 # Subclasses MUST override these:
1942 _table_name = ''
1943 _fields = ()
1944
showarda3c58572009-03-12 20:36:59 +00001945 # A mapping from (type, id) to the instance of the object for that
1946 # particular id. This prevents us from creating new Job() and Host()
1947 # instances for every HostQueueEntry object that we instantiate as
1948 # multiple HQEs often share the same Job.
1949 _instances_by_type_and_id = weakref.WeakValueDictionary()
1950 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001951
showarda3c58572009-03-12 20:36:59 +00001952
1953 def __new__(cls, id=None, **kwargs):
1954 """
1955 Look to see if we already have an instance for this particular type
1956 and id. If so, use it instead of creating a duplicate instance.
1957 """
1958 if id is not None:
1959 instance = cls._instances_by_type_and_id.get((cls, id))
1960 if instance:
1961 return instance
1962 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1963
1964
1965 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001966 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001967 assert self._table_name, '_table_name must be defined in your class'
1968 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001969 if not new_record:
1970 if self._initialized and not always_query:
1971 return # We've already been initialized.
1972 if id is None:
1973 id = row[0]
1974 # Tell future constructors to use us instead of re-querying while
1975 # this instance is still around.
1976 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001977
showard6ae5ea92009-02-25 00:11:51 +00001978 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001979
jadmanski0afbb632008-06-06 21:10:57 +00001980 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001981
jadmanski0afbb632008-06-06 21:10:57 +00001982 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001983 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001984
showarda3c58572009-03-12 20:36:59 +00001985 if self._initialized:
1986 differences = self._compare_fields_in_row(row)
1987 if differences:
showard7629f142009-03-27 21:02:02 +00001988 logging.warn(
1989 'initialized %s %s instance requery is updating: %s',
1990 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001991 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001992 self._initialized = True
1993
1994
1995 @classmethod
1996 def _clear_instance_cache(cls):
1997 """Used for testing, clear the internal instance cache."""
1998 cls._instances_by_type_and_id.clear()
1999
2000
showardccbd6c52009-03-21 00:10:21 +00002001 def _fetch_row_from_db(self, row_id):
2002 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2003 rows = _db.execute(sql, (row_id,))
2004 if not rows:
showard76e29d12009-04-15 21:53:10 +00002005 raise DBError("row not found (table=%s, row id=%s)"
2006 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002007 return rows[0]
2008
2009
showarda3c58572009-03-12 20:36:59 +00002010 def _assert_row_length(self, row):
2011 assert len(row) == len(self._fields), (
2012 "table = %s, row = %s/%d, fields = %s/%d" % (
2013 self.__table, row, len(row), self._fields, len(self._fields)))
2014
2015
2016 def _compare_fields_in_row(self, row):
2017 """
2018 Given a row as returned by a SELECT query, compare it to our existing
2019 in memory fields.
2020
2021 @param row - A sequence of values corresponding to fields named in
2022 The class attribute _fields.
2023
2024 @returns A dictionary listing the differences keyed by field name
2025 containing tuples of (current_value, row_value).
2026 """
2027 self._assert_row_length(row)
2028 differences = {}
2029 for field, row_value in itertools.izip(self._fields, row):
2030 current_value = getattr(self, field)
2031 if current_value != row_value:
2032 differences[field] = (current_value, row_value)
2033 return differences
showard2bab8f42008-11-12 18:15:22 +00002034
2035
2036 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002037 """
2038 Update our field attributes using a single row returned by SELECT.
2039
2040 @param row - A sequence of values corresponding to fields named in
2041 the class fields list.
2042 """
2043 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002044
showard2bab8f42008-11-12 18:15:22 +00002045 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002046 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002047 setattr(self, field, value)
2048 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002049
showard2bab8f42008-11-12 18:15:22 +00002050 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002051
mblighe2586682008-02-29 22:45:46 +00002052
showardccbd6c52009-03-21 00:10:21 +00002053 def update_from_database(self):
2054 assert self.id is not None
2055 row = self._fetch_row_from_db(self.id)
2056 self._update_fields_from_row(row)
2057
2058
jadmanski0afbb632008-06-06 21:10:57 +00002059 def count(self, where, table = None):
2060 if not table:
2061 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002062
jadmanski0afbb632008-06-06 21:10:57 +00002063 rows = _db.execute("""
2064 SELECT count(*) FROM %s
2065 WHERE %s
2066 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002067
jadmanski0afbb632008-06-06 21:10:57 +00002068 assert len(rows) == 1
2069
2070 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002071
2072
showardd3dc1992009-04-22 21:01:40 +00002073 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002074 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002075
showard2bab8f42008-11-12 18:15:22 +00002076 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002077 return
mbligh36768f02008-02-22 18:28:33 +00002078
mblighf8c624d2008-07-03 16:58:45 +00002079 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002080 _db.execute(query, (value, self.id))
2081
showard2bab8f42008-11-12 18:15:22 +00002082 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002083
2084
jadmanski0afbb632008-06-06 21:10:57 +00002085 def save(self):
2086 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002087 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002088 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002089 values = []
2090 for key in keys:
2091 value = getattr(self, key)
2092 if value is None:
2093 values.append('NULL')
2094 else:
2095 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002096 values_str = ','.join(values)
2097 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2098 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002099 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002100 # Update our id to the one the database just assigned to us.
2101 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002102
2103
jadmanski0afbb632008-06-06 21:10:57 +00002104 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002105 self._instances_by_type_and_id.pop((type(self), id), None)
2106 self._initialized = False
2107 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002108 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2109 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002110
2111
showard63a34772008-08-18 19:32:50 +00002112 @staticmethod
2113 def _prefix_with(string, prefix):
2114 if string:
2115 string = prefix + string
2116 return string
2117
2118
jadmanski0afbb632008-06-06 21:10:57 +00002119 @classmethod
showard989f25d2008-10-01 11:38:11 +00002120 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002121 """
2122 Construct instances of our class based on the given database query.
2123
2124 @yields One class instance for each row fetched.
2125 """
showard63a34772008-08-18 19:32:50 +00002126 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2127 where = cls._prefix_with(where, 'WHERE ')
2128 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002129 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002130 'joins' : joins,
2131 'where' : where,
2132 'order_by' : order_by})
2133 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002134 for row in rows:
2135 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002136
mbligh36768f02008-02-22 18:28:33 +00002137
2138class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002139 _table_name = 'ineligible_host_queues'
2140 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002141
2142
showard89f84db2009-03-12 20:39:13 +00002143class AtomicGroup(DBObject):
2144 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002145 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2146 'invalid')
showard89f84db2009-03-12 20:39:13 +00002147
2148
showard989f25d2008-10-01 11:38:11 +00002149class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002150 _table_name = 'labels'
2151 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002152 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002153
2154
mbligh36768f02008-02-22 18:28:33 +00002155class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002156 _table_name = 'hosts'
2157 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2158 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2159
2160
jadmanski0afbb632008-06-06 21:10:57 +00002161 def current_task(self):
2162 rows = _db.execute("""
2163 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2164 """, (self.id,))
2165
2166 if len(rows) == 0:
2167 return None
2168 else:
2169 assert len(rows) == 1
2170 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002171 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002172
2173
jadmanski0afbb632008-06-06 21:10:57 +00002174 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002175 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002176 if self.current_task():
2177 self.current_task().requeue()
2178
showard6ae5ea92009-02-25 00:11:51 +00002179
jadmanski0afbb632008-06-06 21:10:57 +00002180 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002181 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002182 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002183
2184
showard170873e2009-01-07 00:22:26 +00002185 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002186 """
showard170873e2009-01-07 00:22:26 +00002187 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002188 """
2189 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002190 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002191 FROM labels
2192 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002193 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002194 ORDER BY labels.name
2195 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002196 platform = None
2197 all_labels = []
2198 for label_name, is_platform in rows:
2199 if is_platform:
2200 platform = label_name
2201 all_labels.append(label_name)
2202 return platform, all_labels
2203
2204
2205 def reverify_tasks(self):
2206 cleanup_task = CleanupTask(host=self)
2207 verify_task = VerifyTask(host=self)
2208 # just to make sure this host does not get taken away
2209 self.set_status('Cleaning')
2210 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002211
2212
mbligh36768f02008-02-22 18:28:33 +00002213class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002214 _table_name = 'host_queue_entries'
2215 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002216 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002217 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002218
2219
showarda3c58572009-03-12 20:36:59 +00002220 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002221 assert id or row
showarda3c58572009-03-12 20:36:59 +00002222 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002223 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002224
jadmanski0afbb632008-06-06 21:10:57 +00002225 if self.host_id:
2226 self.host = Host(self.host_id)
2227 else:
2228 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002229
showard170873e2009-01-07 00:22:26 +00002230 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002231 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002232
2233
showard89f84db2009-03-12 20:39:13 +00002234 @classmethod
2235 def clone(cls, template):
2236 """
2237 Creates a new row using the values from a template instance.
2238
2239 The new instance will not exist in the database or have a valid
2240 id attribute until its save() method is called.
2241 """
2242 assert isinstance(template, cls)
2243 new_row = [getattr(template, field) for field in cls._fields]
2244 clone = cls(row=new_row, new_record=True)
2245 clone.id = None
2246 return clone
2247
2248
showardc85c21b2008-11-24 22:17:37 +00002249 def _view_job_url(self):
2250 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2251
2252
jadmanski0afbb632008-06-06 21:10:57 +00002253 def set_host(self, host):
2254 if host:
2255 self.queue_log_record('Assigning host ' + host.hostname)
2256 self.update_field('host_id', host.id)
2257 self.update_field('active', True)
2258 self.block_host(host.id)
2259 else:
2260 self.queue_log_record('Releasing host')
2261 self.unblock_host(self.host.id)
2262 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002263
jadmanski0afbb632008-06-06 21:10:57 +00002264 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002265
2266
jadmanski0afbb632008-06-06 21:10:57 +00002267 def get_host(self):
2268 return self.host
mbligh36768f02008-02-22 18:28:33 +00002269
2270
jadmanski0afbb632008-06-06 21:10:57 +00002271 def queue_log_record(self, log_line):
2272 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002273 _drone_manager.write_lines_to_file(self.queue_log_path,
2274 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002275
2276
jadmanski0afbb632008-06-06 21:10:57 +00002277 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002278 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002279 row = [0, self.job.id, host_id]
2280 block = IneligibleHostQueue(row=row, new_record=True)
2281 block.save()
mblighe2586682008-02-29 22:45:46 +00002282
2283
jadmanski0afbb632008-06-06 21:10:57 +00002284 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002285 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002286 blocks = IneligibleHostQueue.fetch(
2287 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2288 for block in blocks:
2289 block.delete()
mblighe2586682008-02-29 22:45:46 +00002290
2291
showard2bab8f42008-11-12 18:15:22 +00002292 def set_execution_subdir(self, subdir=None):
2293 if subdir is None:
2294 assert self.get_host()
2295 subdir = self.get_host().hostname
2296 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002297
2298
showard6355f6b2008-12-05 18:52:13 +00002299 def _get_hostname(self):
2300 if self.host:
2301 return self.host.hostname
2302 return 'no host'
2303
2304
showard170873e2009-01-07 00:22:26 +00002305 def __str__(self):
2306 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2307
2308
jadmanski0afbb632008-06-06 21:10:57 +00002309 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002310 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002311
showardb18134f2009-03-20 20:52:18 +00002312 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002313
showardc85c21b2008-11-24 22:17:37 +00002314 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002315 self.update_field('complete', False)
2316 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002317
jadmanski0afbb632008-06-06 21:10:57 +00002318 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002319 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002320 self.update_field('complete', False)
2321 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002322
showardc85c21b2008-11-24 22:17:37 +00002323 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002324 self.update_field('complete', True)
2325 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002326
2327 should_email_status = (status.lower() in _notify_email_statuses or
2328 'all' in _notify_email_statuses)
2329 if should_email_status:
2330 self._email_on_status(status)
2331
2332 self._email_on_job_complete()
2333
2334
2335 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002336 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002337
2338 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2339 self.job.id, self.job.name, hostname, status)
2340 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2341 self.job.id, self.job.name, hostname, status,
2342 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002343 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002344
2345
2346 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002347 if not self.job.is_finished():
2348 return
showard542e8402008-09-19 20:16:18 +00002349
showardc85c21b2008-11-24 22:17:37 +00002350 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002351 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002352 for queue_entry in hosts_queue:
2353 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002354 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002355 queue_entry.status))
2356
2357 summary_text = "\n".join(summary_text)
2358 status_counts = models.Job.objects.get_status_counts(
2359 [self.job.id])[self.job.id]
2360 status = ', '.join('%d %s' % (count, status) for status, count
2361 in status_counts.iteritems())
2362
2363 subject = 'Autotest: Job ID: %s "%s" %s' % (
2364 self.job.id, self.job.name, status)
2365 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2366 self.job.id, self.job.name, status, self._view_job_url(),
2367 summary_text)
showard170873e2009-01-07 00:22:26 +00002368 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002369
2370
showard89f84db2009-03-12 20:39:13 +00002371 def run(self, assigned_host=None):
2372 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002373 assert assigned_host
2374 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002375 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002376
showardb18134f2009-03-20 20:52:18 +00002377 logging.info("%s/%s/%s scheduled on %s, status=%s",
2378 self.job.name, self.meta_host, self.atomic_group_id,
2379 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002380
jadmanski0afbb632008-06-06 21:10:57 +00002381 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002382
showard6ae5ea92009-02-25 00:11:51 +00002383
jadmanski0afbb632008-06-06 21:10:57 +00002384 def requeue(self):
2385 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002386 # verify/cleanup failure sets the execution subdir, so reset it here
2387 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002388 if self.meta_host:
2389 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002390
2391
jadmanski0afbb632008-06-06 21:10:57 +00002392 def handle_host_failure(self):
2393 """\
2394 Called when this queue entry's host has failed verification and
2395 repair.
2396 """
2397 assert not self.meta_host
2398 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002399 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002400
2401
jadmanskif7fa2cc2008-10-01 14:13:23 +00002402 @property
2403 def aborted_by(self):
2404 self._load_abort_info()
2405 return self._aborted_by
2406
2407
2408 @property
2409 def aborted_on(self):
2410 self._load_abort_info()
2411 return self._aborted_on
2412
2413
2414 def _load_abort_info(self):
2415 """ Fetch info about who aborted the job. """
2416 if hasattr(self, "_aborted_by"):
2417 return
2418 rows = _db.execute("""
2419 SELECT users.login, aborted_host_queue_entries.aborted_on
2420 FROM aborted_host_queue_entries
2421 INNER JOIN users
2422 ON users.id = aborted_host_queue_entries.aborted_by_id
2423 WHERE aborted_host_queue_entries.queue_entry_id = %s
2424 """, (self.id,))
2425 if rows:
2426 self._aborted_by, self._aborted_on = rows[0]
2427 else:
2428 self._aborted_by = self._aborted_on = None
2429
2430
showardb2e2c322008-10-14 17:33:55 +00002431 def on_pending(self):
2432 """
2433 Called when an entry in a synchronous job has passed verify. If the
2434 job is ready to run, returns an agent to run the job. Returns None
2435 otherwise.
2436 """
2437 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002438 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002439 if self.job.is_ready():
2440 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002441 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002442 return None
2443
2444
showardd3dc1992009-04-22 21:01:40 +00002445 def abort(self, dispatcher):
2446 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002447
showardd3dc1992009-04-22 21:01:40 +00002448 Status = models.HostQueueEntry.Status
2449 has_running_job_agent = (
2450 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2451 and dispatcher.get_agents_for_entry(self))
2452 if has_running_job_agent:
2453 # do nothing; post-job tasks will finish and then mark this entry
2454 # with status "Aborted" and take care of the host
2455 return
2456
2457 if self.status in (Status.STARTING, Status.PENDING):
2458 self.host.set_status(models.Host.Status.READY)
2459 elif self.status == Status.VERIFYING:
2460 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2461
2462 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002463
2464 def execution_tag(self):
2465 assert self.execution_subdir
2466 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002467
2468
mbligh36768f02008-02-22 18:28:33 +00002469class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002470 _table_name = 'jobs'
2471 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2472 'control_type', 'created_on', 'synch_count', 'timeout',
2473 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2474
2475
showarda3c58572009-03-12 20:36:59 +00002476 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002477 assert id or row
showarda3c58572009-03-12 20:36:59 +00002478 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002479
mblighe2586682008-02-29 22:45:46 +00002480
jadmanski0afbb632008-06-06 21:10:57 +00002481 def is_server_job(self):
2482 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002483
2484
showard170873e2009-01-07 00:22:26 +00002485 def tag(self):
2486 return "%s-%s" % (self.id, self.owner)
2487
2488
jadmanski0afbb632008-06-06 21:10:57 +00002489 def get_host_queue_entries(self):
2490 rows = _db.execute("""
2491 SELECT * FROM host_queue_entries
2492 WHERE job_id= %s
2493 """, (self.id,))
2494 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002495
jadmanski0afbb632008-06-06 21:10:57 +00002496 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002497
jadmanski0afbb632008-06-06 21:10:57 +00002498 return entries
mbligh36768f02008-02-22 18:28:33 +00002499
2500
jadmanski0afbb632008-06-06 21:10:57 +00002501 def set_status(self, status, update_queues=False):
2502 self.update_field('status',status)
2503
2504 if update_queues:
2505 for queue_entry in self.get_host_queue_entries():
2506 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002507
2508
jadmanski0afbb632008-06-06 21:10:57 +00002509 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002510 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2511 status='Pending')
2512 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002513
2514
jadmanski0afbb632008-06-06 21:10:57 +00002515 def num_machines(self, clause = None):
2516 sql = "job_id=%s" % self.id
2517 if clause:
2518 sql += " AND (%s)" % clause
2519 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002520
2521
jadmanski0afbb632008-06-06 21:10:57 +00002522 def num_queued(self):
2523 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002524
2525
jadmanski0afbb632008-06-06 21:10:57 +00002526 def num_active(self):
2527 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002528
2529
jadmanski0afbb632008-06-06 21:10:57 +00002530 def num_complete(self):
2531 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002532
2533
jadmanski0afbb632008-06-06 21:10:57 +00002534 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002535 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002536
mbligh36768f02008-02-22 18:28:33 +00002537
showard6bb7c292009-01-30 01:44:51 +00002538 def _not_yet_run_entries(self, include_verifying=True):
2539 statuses = [models.HostQueueEntry.Status.QUEUED,
2540 models.HostQueueEntry.Status.PENDING]
2541 if include_verifying:
2542 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2543 return models.HostQueueEntry.objects.filter(job=self.id,
2544 status__in=statuses)
2545
2546
2547 def _stop_all_entries(self):
2548 entries_to_stop = self._not_yet_run_entries(
2549 include_verifying=False)
2550 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002551 assert not child_entry.complete, (
2552 '%s status=%s, active=%s, complete=%s' %
2553 (child_entry.id, child_entry.status, child_entry.active,
2554 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002555 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2556 child_entry.host.status = models.Host.Status.READY
2557 child_entry.host.save()
2558 child_entry.status = models.HostQueueEntry.Status.STOPPED
2559 child_entry.save()
2560
showard2bab8f42008-11-12 18:15:22 +00002561 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002562 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002563 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002564 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002565
2566
jadmanski0afbb632008-06-06 21:10:57 +00002567 def write_to_machines_file(self, queue_entry):
2568 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002569 file_path = os.path.join(self.tag(), '.machines')
2570 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002571
2572
showard2bab8f42008-11-12 18:15:22 +00002573 def _next_group_name(self):
2574 query = models.HostQueueEntry.objects.filter(
2575 job=self.id).values('execution_subdir').distinct()
2576 subdirs = (entry['execution_subdir'] for entry in query)
2577 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2578 ids = [int(match.group(1)) for match in groups if match]
2579 if ids:
2580 next_id = max(ids) + 1
2581 else:
2582 next_id = 0
2583 return "group%d" % next_id
2584
2585
showard170873e2009-01-07 00:22:26 +00002586 def _write_control_file(self, execution_tag):
2587 control_path = _drone_manager.attach_file_to_execution(
2588 execution_tag, self.control_file)
2589 return control_path
mbligh36768f02008-02-22 18:28:33 +00002590
showardb2e2c322008-10-14 17:33:55 +00002591
showard2bab8f42008-11-12 18:15:22 +00002592 def get_group_entries(self, queue_entry_from_group):
2593 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002594 return list(HostQueueEntry.fetch(
2595 where='job_id=%s AND execution_subdir=%s',
2596 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002597
2598
showardb2e2c322008-10-14 17:33:55 +00002599 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002600 assert queue_entries
2601 execution_tag = queue_entries[0].execution_tag()
2602 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002603 hostnames = ','.join([entry.get_host().hostname
2604 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002605
showard87ba02a2009-04-20 19:37:32 +00002606 params = _autoserv_command_line(
2607 hostnames, execution_tag,
2608 ['-P', execution_tag, '-n',
2609 _drone_manager.absolute_path(control_path)],
2610 job=self)
mbligh36768f02008-02-22 18:28:33 +00002611
jadmanski0afbb632008-06-06 21:10:57 +00002612 if not self.is_server_job():
2613 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002614
showardb2e2c322008-10-14 17:33:55 +00002615 return params
mblighe2586682008-02-29 22:45:46 +00002616
mbligh36768f02008-02-22 18:28:33 +00002617
showardc9ae1782009-01-30 01:42:37 +00002618 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002619 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002620 return True
showard0fc38302008-10-23 00:44:07 +00002621 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002622 return queue_entry.get_host().dirty
2623 return False
showard21baa452008-10-21 00:08:39 +00002624
showardc9ae1782009-01-30 01:42:37 +00002625
2626 def _should_run_verify(self, queue_entry):
2627 do_not_verify = (queue_entry.host.protection ==
2628 host_protections.Protection.DO_NOT_VERIFY)
2629 if do_not_verify:
2630 return False
2631 return self.run_verify
2632
2633
2634 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002635 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002636 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002637 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002638 if self._should_run_verify(queue_entry):
2639 tasks.append(VerifyTask(queue_entry=queue_entry))
2640 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002641 return tasks
2642
2643
showard2bab8f42008-11-12 18:15:22 +00002644 def _assign_new_group(self, queue_entries):
2645 if len(queue_entries) == 1:
2646 group_name = queue_entries[0].get_host().hostname
2647 else:
2648 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002649 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002650 self.id, [entry.host.hostname for entry in queue_entries],
2651 group_name)
2652
2653 for queue_entry in queue_entries:
2654 queue_entry.set_execution_subdir(group_name)
2655
2656
2657 def _choose_group_to_run(self, include_queue_entry):
2658 chosen_entries = [include_queue_entry]
2659
2660 num_entries_needed = self.synch_count - 1
2661 if num_entries_needed > 0:
2662 pending_entries = HostQueueEntry.fetch(
2663 where='job_id = %s AND status = "Pending" AND id != %s',
2664 params=(self.id, include_queue_entry.id))
2665 chosen_entries += list(pending_entries)[:num_entries_needed]
2666
2667 self._assign_new_group(chosen_entries)
2668 return chosen_entries
2669
2670
2671 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002672 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002673 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2674 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002675
showard2bab8f42008-11-12 18:15:22 +00002676 queue_entries = self._choose_group_to_run(queue_entry)
2677 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002678
2679
2680 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002681 for queue_entry in queue_entries:
2682 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002683 params = self._get_autoserv_params(queue_entries)
2684 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2685 cmd=params)
2686 tasks = initial_tasks + [queue_task]
2687 entry_ids = [entry.id for entry in queue_entries]
2688
showard170873e2009-01-07 00:22:26 +00002689 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002690
2691
mbligh36768f02008-02-22 18:28:33 +00002692if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002693 main()