blob: b269a986d9da72227c59293635c79bb271bf9b00 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
228 '-r', _drone_manager.absolute_path(results_dir)]
229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
233 return autoserv_argv + extra_args
234
235
showard89f84db2009-03-12 20:39:13 +0000236class SchedulerError(Exception):
237 """Raised by HostScheduler when an inconsistent state occurs."""
238
239
showard63a34772008-08-18 19:32:50 +0000240class HostScheduler(object):
241 def _get_ready_hosts(self):
242 # avoid any host with a currently active queue entry against it
243 hosts = Host.fetch(
244 joins='LEFT JOIN host_queue_entries AS active_hqe '
245 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000246 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000247 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000248 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000249 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
250 return dict((host.id, host) for host in hosts)
251
252
253 @staticmethod
254 def _get_sql_id_list(id_list):
255 return ','.join(str(item_id) for item_id in id_list)
256
257
258 @classmethod
showard989f25d2008-10-01 11:38:11 +0000259 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000260 if not id_list:
261 return {}
showard63a34772008-08-18 19:32:50 +0000262 query %= cls._get_sql_id_list(id_list)
263 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000264 return cls._process_many2many_dict(rows, flip)
265
266
267 @staticmethod
268 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000269 result = {}
270 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000271 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000272 if flip:
273 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000274 result.setdefault(left_id, set()).add(right_id)
275 return result
276
277
278 @classmethod
279 def _get_job_acl_groups(cls, job_ids):
280 query = """
showardd9ac4452009-02-07 02:04:37 +0000281 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000282 FROM jobs
283 INNER JOIN users ON users.login = jobs.owner
284 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
285 WHERE jobs.id IN (%s)
286 """
287 return cls._get_many2many_dict(query, job_ids)
288
289
290 @classmethod
291 def _get_job_ineligible_hosts(cls, job_ids):
292 query = """
293 SELECT job_id, host_id
294 FROM ineligible_host_queues
295 WHERE job_id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
showard989f25d2008-10-01 11:38:11 +0000301 def _get_job_dependencies(cls, job_ids):
302 query = """
303 SELECT job_id, label_id
304 FROM jobs_dependency_labels
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard63a34772008-08-18 19:32:50 +0000311 def _get_host_acls(cls, host_ids):
312 query = """
showardd9ac4452009-02-07 02:04:37 +0000313 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000314 FROM acl_groups_hosts
315 WHERE host_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, host_ids)
318
319
320 @classmethod
321 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000322 if not host_ids:
323 return {}, {}
showard63a34772008-08-18 19:32:50 +0000324 query = """
325 SELECT label_id, host_id
326 FROM hosts_labels
327 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000328 """ % cls._get_sql_id_list(host_ids)
329 rows = _db.execute(query)
330 labels_to_hosts = cls._process_many2many_dict(rows)
331 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
332 return labels_to_hosts, hosts_to_labels
333
334
335 @classmethod
336 def _get_labels(cls):
337 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000338
339
340 def refresh(self, pending_queue_entries):
341 self._hosts_available = self._get_ready_hosts()
342
343 relevant_jobs = [queue_entry.job_id
344 for queue_entry in pending_queue_entries]
345 self._job_acls = self._get_job_acl_groups(relevant_jobs)
346 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000347 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000348
349 host_ids = self._hosts_available.keys()
350 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000351 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
352
353 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000354
355
356 def _is_acl_accessible(self, host_id, queue_entry):
357 job_acls = self._job_acls.get(queue_entry.job_id, set())
358 host_acls = self._host_acls.get(host_id, set())
359 return len(host_acls.intersection(job_acls)) > 0
360
361
showard989f25d2008-10-01 11:38:11 +0000362 def _check_job_dependencies(self, job_dependencies, host_labels):
363 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000364 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000365
366
367 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
368 queue_entry):
showardade14e22009-01-26 22:38:32 +0000369 if not queue_entry.meta_host:
370 # bypass only_if_needed labels when a specific host is selected
371 return True
372
showard989f25d2008-10-01 11:38:11 +0000373 for label_id in host_labels:
374 label = self._labels[label_id]
375 if not label.only_if_needed:
376 # we don't care about non-only_if_needed labels
377 continue
378 if queue_entry.meta_host == label_id:
379 # if the label was requested in a metahost it's OK
380 continue
381 if label_id not in job_dependencies:
382 return False
383 return True
384
385
showard89f84db2009-03-12 20:39:13 +0000386 def _check_atomic_group_labels(self, host_labels, queue_entry):
387 """
388 Determine if the given HostQueueEntry's atomic group settings are okay
389 to schedule on a host with the given labels.
390
391 @param host_labels - A list of label ids that the host has.
392 @param queue_entry - The HostQueueEntry being considered for the host.
393
394 @returns True if atomic group settings are okay, False otherwise.
395 """
396 return (self._get_host_atomic_group_id(host_labels) ==
397 queue_entry.atomic_group_id)
398
399
400 def _get_host_atomic_group_id(self, host_labels):
401 """
402 Return the atomic group label id for a host with the given set of
403 labels if any, or None otherwise. Raises an exception if more than
404 one atomic group are found in the set of labels.
405
406 @param host_labels - A list of label ids that the host has.
407
408 @returns The id of the atomic group found on a label in host_labels
409 or None if no atomic group label is found.
410 @raises SchedulerError - If more than one atomic group label is found.
411 """
412 atomic_ids = [self._labels[label_id].atomic_group_id
413 for label_id in host_labels
414 if self._labels[label_id].atomic_group_id is not None]
415 if not atomic_ids:
416 return None
417 if len(atomic_ids) > 1:
418 raise SchedulerError('More than one atomic label on host.')
419 return atomic_ids[0]
420
421
422 def _get_atomic_group_labels(self, atomic_group_id):
423 """
424 Lookup the label ids that an atomic_group is associated with.
425
426 @param atomic_group_id - The id of the AtomicGroup to look up.
427
428 @returns A generator yeilding Label ids for this atomic group.
429 """
430 return (id for id, label in self._labels.iteritems()
431 if label.atomic_group_id == atomic_group_id
432 and not label.invalid)
433
434
435 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
436 """
437 @param group_hosts - A sequence of Host ids to test for usability
438 and eligibility against the Job associated with queue_entry.
439 @param queue_entry - The HostQueueEntry that these hosts are being
440 tested for eligibility against.
441
442 @returns A subset of group_hosts Host ids that are eligible for the
443 supplied queue_entry.
444 """
445 return set(host_id for host_id in group_hosts
446 if self._is_host_usable(host_id)
447 and self._is_host_eligible_for_job(host_id, queue_entry))
448
449
showard989f25d2008-10-01 11:38:11 +0000450 def _is_host_eligible_for_job(self, host_id, queue_entry):
451 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
452 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000453
showard89f84db2009-03-12 20:39:13 +0000454 return (self._is_acl_accessible(host_id, queue_entry) and
455 self._check_job_dependencies(job_dependencies, host_labels) and
456 self._check_only_if_needed_labels(
457 job_dependencies, host_labels, queue_entry) and
458 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000459
460
showard63a34772008-08-18 19:32:50 +0000461 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000462 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000463 return None
464 return self._hosts_available.pop(queue_entry.host_id, None)
465
466
467 def _is_host_usable(self, host_id):
468 if host_id not in self._hosts_available:
469 # host was already used during this scheduling cycle
470 return False
471 if self._hosts_available[host_id].invalid:
472 # Invalid hosts cannot be used for metahosts. They're included in
473 # the original query because they can be used by non-metahosts.
474 return False
475 return True
476
477
478 def _schedule_metahost(self, queue_entry):
479 label_id = queue_entry.meta_host
480 hosts_in_label = self._label_hosts.get(label_id, set())
481 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
482 set())
483
484 # must iterate over a copy so we can mutate the original while iterating
485 for host_id in list(hosts_in_label):
486 if not self._is_host_usable(host_id):
487 hosts_in_label.remove(host_id)
488 continue
489 if host_id in ineligible_host_ids:
490 continue
showard989f25d2008-10-01 11:38:11 +0000491 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000492 continue
493
showard89f84db2009-03-12 20:39:13 +0000494 # Remove the host from our cached internal state before returning
495 # the host object.
showard63a34772008-08-18 19:32:50 +0000496 hosts_in_label.remove(host_id)
497 return self._hosts_available.pop(host_id)
498 return None
499
500
501 def find_eligible_host(self, queue_entry):
502 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000503 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000504 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000505 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000506 return self._schedule_metahost(queue_entry)
507
508
showard89f84db2009-03-12 20:39:13 +0000509 def find_eligible_atomic_group(self, queue_entry):
510 """
511 Given an atomic group host queue entry, locate an appropriate group
512 of hosts for the associated job to run on.
513
514 The caller is responsible for creating new HQEs for the additional
515 hosts returned in order to run the actual job on them.
516
517 @returns A list of Host instances in a ready state to satisfy this
518 atomic group scheduling. Hosts will all belong to the same
519 atomic group label as specified by the queue_entry.
520 An empty list will be returned if no suitable atomic
521 group could be found.
522
523 TODO(gps): what is responsible for kicking off any attempted repairs on
524 a group of hosts? not this function, but something needs to. We do
525 not communicate that reason for returning [] outside of here...
526 For now, we'll just be unschedulable if enough hosts within one group
527 enter Repair Failed state.
528 """
529 assert queue_entry.atomic_group_id is not None
530 job = queue_entry.job
531 assert job.synch_count and job.synch_count > 0
532 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
533 if job.synch_count > atomic_group.max_number_of_machines:
534 # Such a Job and HostQueueEntry should never be possible to
535 # create using the frontend. Regardless, we can't process it.
536 # Abort it immediately and log an error on the scheduler.
537 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000538 logging.error(
539 'Error: job %d synch_count=%d > requested atomic_group %d '
540 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
541 job.id, job.synch_count, atomic_group.id,
542 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000543 return []
544 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
545 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
546 set())
547
548 # Look in each label associated with atomic_group until we find one with
549 # enough hosts to satisfy the job.
550 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
551 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
552 if queue_entry.meta_host is not None:
553 # If we have a metahost label, only allow its hosts.
554 group_hosts.intersection_update(hosts_in_label)
555 group_hosts -= ineligible_host_ids
556 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
557 group_hosts, queue_entry)
558
559 # Job.synch_count is treated as "minimum synch count" when
560 # scheduling for an atomic group of hosts. The atomic group
561 # number of machines is the maximum to pick out of a single
562 # atomic group label for scheduling at one time.
563 min_hosts = job.synch_count
564 max_hosts = atomic_group.max_number_of_machines
565
566 if len(eligible_hosts_in_group) < min_hosts:
567 # Not enough eligible hosts in this atomic group label.
568 continue
569
showardef519212009-05-08 02:29:53 +0000570 # So that they show up in a sane order when viewing the job.
571 eligible_hosts_in_group = sorted(eligible_hosts_in_group)
572
showard89f84db2009-03-12 20:39:13 +0000573 # Limit ourselves to scheduling the atomic group size.
574 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000575 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000576
577 # Remove the selected hosts from our cached internal state
578 # of available hosts in order to return the Host objects.
579 host_list = []
580 for host_id in eligible_hosts_in_group:
581 hosts_in_label.discard(host_id)
582 host_list.append(self._hosts_available.pop(host_id))
583 return host_list
584
585 return []
586
587
showard170873e2009-01-07 00:22:26 +0000588class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000589 def __init__(self):
590 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000591 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000592 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000593 user_cleanup_time = scheduler_config.config.clean_interval
594 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
595 _db, user_cleanup_time)
596 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000597 self._host_agents = {}
598 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000599
mbligh36768f02008-02-22 18:28:33 +0000600
showard915958d2009-04-22 21:00:58 +0000601 def initialize(self, recover_hosts=True):
602 self._periodic_cleanup.initialize()
603 self._24hr_upkeep.initialize()
604
jadmanski0afbb632008-06-06 21:10:57 +0000605 # always recover processes
606 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000607
jadmanski0afbb632008-06-06 21:10:57 +0000608 if recover_hosts:
609 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000610
611
jadmanski0afbb632008-06-06 21:10:57 +0000612 def tick(self):
showard170873e2009-01-07 00:22:26 +0000613 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000614 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000615 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000616 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000617 self._schedule_new_jobs()
618 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000619 _drone_manager.execute_actions()
620 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000621
showard97aed502008-11-04 02:01:24 +0000622
mblighf3294cc2009-04-08 21:17:38 +0000623 def _run_cleanup(self):
624 self._periodic_cleanup.run_cleanup_maybe()
625 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard170873e2009-01-07 00:22:26 +0000628 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
629 for object_id in object_ids:
630 agent_dict.setdefault(object_id, set()).add(agent)
631
632
633 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
634 for object_id in object_ids:
635 assert object_id in agent_dict
636 agent_dict[object_id].remove(agent)
637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def add_agent(self, agent):
640 self._agents.append(agent)
641 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000642 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
643 self._register_agent_for_ids(self._queue_entry_agents,
644 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000645
showard170873e2009-01-07 00:22:26 +0000646
647 def get_agents_for_entry(self, queue_entry):
648 """
649 Find agents corresponding to the specified queue_entry.
650 """
showardd3dc1992009-04-22 21:01:40 +0000651 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000652
653
654 def host_has_agent(self, host):
655 """
656 Determine if there is currently an Agent present using this host.
657 """
658 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000659
660
jadmanski0afbb632008-06-06 21:10:57 +0000661 def remove_agent(self, agent):
662 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000663 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
664 agent)
665 self._unregister_agent_for_ids(self._queue_entry_agents,
666 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000667
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000670 self._register_pidfiles()
671 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000672 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000673 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000674 self._reverify_remaining_hosts()
675 # reinitialize drones after killing orphaned processes, since they can
676 # leave around files when they die
677 _drone_manager.execute_actions()
678 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def _register_pidfiles(self):
682 # during recovery we may need to read pidfiles for both running and
683 # parsing entries
684 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000685 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000686 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000687 for pidfile_name in _ALL_PIDFILE_NAMES:
688 pidfile_id = _drone_manager.get_pidfile_id_from(
689 queue_entry.execution_tag(), pidfile_name=pidfile_name)
690 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000691
692
showardd3dc1992009-04-22 21:01:40 +0000693 def _recover_entries_with_status(self, status, orphans, pidfile_name,
694 recover_entries_fn):
695 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000696 for queue_entry in queue_entries:
697 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000698 # synchronous job we've already recovered
699 continue
showardd3dc1992009-04-22 21:01:40 +0000700 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000701 execution_tag = queue_entry.execution_tag()
702 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000703 run_monitor.attach_to_existing_process(execution_tag,
704 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000705 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000706 # execution apparently never happened
707 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000708 continue
mbligh90a549d2008-03-25 23:52:34 +0000709
showardd3dc1992009-04-22 21:01:40 +0000710 logging.info('Recovering %s entry %s (process %s)',
711 status.lower(),
712 ', '.join(str(entry) for entry in queue_entries),
713 run_monitor.get_process())
714 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
715 orphans.discard(run_monitor.get_process())
716
717
718 def _kill_remaining_orphan_processes(self, orphans):
719 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000720 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000721 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000722
showard170873e2009-01-07 00:22:26 +0000723
showardd3dc1992009-04-22 21:01:40 +0000724 def _recover_running_entries(self, orphans):
725 def recover_entries(job, queue_entries, run_monitor):
726 if run_monitor is not None:
727 queue_task = RecoveryQueueTask(job=job,
728 queue_entries=queue_entries,
729 run_monitor=run_monitor)
730 self.add_agent(Agent(tasks=[queue_task],
731 num_processes=len(queue_entries)))
732 # else, _requeue_other_active_entries will cover this
733
734 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
735 orphans, '.autoserv_execute',
736 recover_entries)
737
738
739 def _recover_gathering_entries(self, orphans):
740 def recover_entries(job, queue_entries, run_monitor):
741 gather_task = GatherLogsTask(job, queue_entries,
742 run_monitor=run_monitor)
743 self.add_agent(Agent([gather_task]))
744
745 self._recover_entries_with_status(
746 models.HostQueueEntry.Status.GATHERING,
747 orphans, _CRASHINFO_PID_FILE, recover_entries)
748
749
750 def _recover_parsing_entries(self, orphans):
751 def recover_entries(job, queue_entries, run_monitor):
752 reparse_task = FinalReparseTask(queue_entries,
753 run_monitor=run_monitor)
754 self.add_agent(Agent([reparse_task], num_processes=0))
755
756 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
757 orphans, _PARSER_PID_FILE,
758 recover_entries)
759
760
761 def _recover_all_recoverable_entries(self):
762 orphans = _drone_manager.get_orphaned_autoserv_processes()
763 self._recover_running_entries(orphans)
764 self._recover_gathering_entries(orphans)
765 self._recover_parsing_entries(orphans)
766 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000767
showard97aed502008-11-04 02:01:24 +0000768
showard170873e2009-01-07 00:22:26 +0000769 def _requeue_other_active_entries(self):
770 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000771 where='active AND NOT complete AND '
772 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000773 for queue_entry in queue_entries:
774 if self.get_agents_for_entry(queue_entry):
775 # entry has already been recovered
776 continue
showardd3dc1992009-04-22 21:01:40 +0000777 if queue_entry.aborted:
778 queue_entry.abort(self)
779 continue
780
781 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000782 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000783 if queue_entry.host:
784 tasks = queue_entry.host.reverify_tasks()
785 self.add_agent(Agent(tasks))
786 agent = queue_entry.requeue()
787
788
789 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000790 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000791 self._reverify_hosts_where("""(status = 'Repairing' OR
792 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000793 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000794
showard170873e2009-01-07 00:22:26 +0000795 # recover "Running" hosts with no active queue entries, although this
796 # should never happen
797 message = ('Recovering running host %s - this probably indicates a '
798 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000799 self._reverify_hosts_where("""status = 'Running' AND
800 id NOT IN (SELECT host_id
801 FROM host_queue_entries
802 WHERE active)""",
803 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000804
805
jadmanski0afbb632008-06-06 21:10:57 +0000806 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000807 print_message='Reverifying host %s'):
808 full_where='locked = 0 AND invalid = 0 AND ' + where
809 for host in Host.fetch(where=full_where):
810 if self.host_has_agent(host):
811 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000812 continue
showard170873e2009-01-07 00:22:26 +0000813 if print_message:
showardb18134f2009-03-20 20:52:18 +0000814 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000815 tasks = host.reverify_tasks()
816 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000817
818
jadmanski0afbb632008-06-06 21:10:57 +0000819 def _recover_hosts(self):
820 # recover "Repair Failed" hosts
821 message = 'Reverifying dead host %s'
822 self._reverify_hosts_where("status = 'Repair Failed'",
823 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000824
825
showard04c82c52008-05-29 19:38:12 +0000826
showardb95b1bd2008-08-15 18:11:04 +0000827 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000828 # prioritize by job priority, then non-metahost over metahost, then FIFO
829 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000830 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000831 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000832 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000833
834
showard89f84db2009-03-12 20:39:13 +0000835 def _refresh_pending_queue_entries(self):
836 """
837 Lookup the pending HostQueueEntries and call our HostScheduler
838 refresh() method given that list. Return the list.
839
840 @returns A list of pending HostQueueEntries sorted in priority order.
841 """
showard63a34772008-08-18 19:32:50 +0000842 queue_entries = self._get_pending_queue_entries()
843 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000844 return []
showardb95b1bd2008-08-15 18:11:04 +0000845
showard63a34772008-08-18 19:32:50 +0000846 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000847
showard89f84db2009-03-12 20:39:13 +0000848 return queue_entries
849
850
851 def _schedule_atomic_group(self, queue_entry):
852 """
853 Schedule the given queue_entry on an atomic group of hosts.
854
855 Returns immediately if there are insufficient available hosts.
856
857 Creates new HostQueueEntries based off of queue_entry for the
858 scheduled hosts and starts them all running.
859 """
860 # This is a virtual host queue entry representing an entire
861 # atomic group, find a group and schedule their hosts.
862 group_hosts = self._host_scheduler.find_eligible_atomic_group(
863 queue_entry)
864 if not group_hosts:
865 return
866 # The first assigned host uses the original HostQueueEntry
867 group_queue_entries = [queue_entry]
868 for assigned_host in group_hosts[1:]:
869 # Create a new HQE for every additional assigned_host.
870 new_hqe = HostQueueEntry.clone(queue_entry)
871 new_hqe.save()
872 group_queue_entries.append(new_hqe)
873 assert len(group_queue_entries) == len(group_hosts)
874 for queue_entry, host in itertools.izip(group_queue_entries,
875 group_hosts):
876 self._run_queue_entry(queue_entry, host)
877
878
879 def _schedule_new_jobs(self):
880 queue_entries = self._refresh_pending_queue_entries()
881 if not queue_entries:
882 return
883
showard63a34772008-08-18 19:32:50 +0000884 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000885 if (queue_entry.atomic_group_id is None or
886 queue_entry.host_id is not None):
887 assigned_host = self._host_scheduler.find_eligible_host(
888 queue_entry)
889 if assigned_host:
890 self._run_queue_entry(queue_entry, assigned_host)
891 else:
892 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000893
894
895 def _run_queue_entry(self, queue_entry, host):
896 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000897 # in some cases (synchronous jobs with run_verify=False), agent may be
898 # None
showard9976ce92008-10-15 20:28:13 +0000899 if agent:
900 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000901
902
jadmanski0afbb632008-06-06 21:10:57 +0000903 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000904 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
905 for agent in self.get_agents_for_entry(entry):
906 agent.abort()
907 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000908
909
showard324bf812009-01-20 23:23:38 +0000910 def _can_start_agent(self, agent, num_started_this_cycle,
911 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000912 # always allow zero-process agents to run
913 if agent.num_processes == 0:
914 return True
915 # don't allow any nonzero-process agents to run after we've reached a
916 # limit (this avoids starvation of many-process agents)
917 if have_reached_limit:
918 return False
919 # total process throttling
showard324bf812009-01-20 23:23:38 +0000920 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000921 return False
922 # if a single agent exceeds the per-cycle throttling, still allow it to
923 # run when it's the first agent in the cycle
924 if num_started_this_cycle == 0:
925 return True
926 # per-cycle throttling
927 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000928 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000929 return False
930 return True
931
932
jadmanski0afbb632008-06-06 21:10:57 +0000933 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000934 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000935 have_reached_limit = False
936 # iterate over copy, so we can remove agents during iteration
937 for agent in list(self._agents):
938 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000939 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000940 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000941 continue
942 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000943 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000944 have_reached_limit):
945 have_reached_limit = True
946 continue
showard4c5374f2008-09-04 17:02:56 +0000947 num_started_this_cycle += agent.num_processes
948 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000949 logging.info('%d running processes',
950 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000951
952
showard29f7cd22009-04-29 21:16:24 +0000953 def _process_recurring_runs(self):
954 recurring_runs = models.RecurringRun.objects.filter(
955 start_date__lte=datetime.datetime.now())
956 for rrun in recurring_runs:
957 # Create job from template
958 job = rrun.job
959 info = rpc_utils.get_job_info(job)
960
961 host_objects = info['hosts']
962 one_time_hosts = info['one_time_hosts']
963 metahost_objects = info['meta_hosts']
964 dependencies = info['dependencies']
965 atomic_group = info['atomic_group']
966
967 for host in one_time_hosts or []:
968 this_host = models.Host.create_one_time_host(host.hostname)
969 host_objects.append(this_host)
970
971 try:
972 rpc_utils.create_new_job(owner=rrun.owner.login,
973 host_objects=host_objects,
974 metahost_objects=metahost_objects,
975 name=job.name,
976 priority=job.priority,
977 control_file=job.control_file,
978 control_type=job.control_type,
979 is_template=False,
980 synch_count=job.synch_count,
981 timeout=job.timeout,
982 run_verify=job.run_verify,
983 email_list=job.email_list,
984 dependencies=dependencies,
985 reboot_before=job.reboot_before,
986 reboot_after=job.reboot_after,
987 atomic_group=atomic_group)
988
989 except Exception, ex:
990 logging.exception(ex)
991 #TODO send email
992
993 if rrun.loop_count == 1:
994 rrun.delete()
995 else:
996 if rrun.loop_count != 0: # if not infinite loop
997 # calculate new start_date
998 difference = datetime.timedelta(seconds=rrun.loop_period)
999 rrun.start_date = rrun.start_date + difference
1000 rrun.loop_count -= 1
1001 rrun.save()
1002
1003
showard170873e2009-01-07 00:22:26 +00001004class PidfileRunMonitor(object):
1005 """
1006 Client must call either run() to start a new process or
1007 attach_to_existing_process().
1008 """
mbligh36768f02008-02-22 18:28:33 +00001009
showard170873e2009-01-07 00:22:26 +00001010 class _PidfileException(Exception):
1011 """
1012 Raised when there's some unexpected behavior with the pid file, but only
1013 used internally (never allowed to escape this class).
1014 """
mbligh36768f02008-02-22 18:28:33 +00001015
1016
showard170873e2009-01-07 00:22:26 +00001017 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001018 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001019 self._start_time = None
1020 self.pidfile_id = None
1021 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001022
1023
showard170873e2009-01-07 00:22:26 +00001024 def _add_nice_command(self, command, nice_level):
1025 if not nice_level:
1026 return command
1027 return ['nice', '-n', str(nice_level)] + command
1028
1029
1030 def _set_start_time(self):
1031 self._start_time = time.time()
1032
1033
1034 def run(self, command, working_directory, nice_level=None, log_file=None,
1035 pidfile_name=None, paired_with_pidfile=None):
1036 assert command is not None
1037 if nice_level is not None:
1038 command = ['nice', '-n', str(nice_level)] + command
1039 self._set_start_time()
1040 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001041 command, working_directory, pidfile_name=pidfile_name,
1042 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001043
1044
showardd3dc1992009-04-22 21:01:40 +00001045 def attach_to_existing_process(self, execution_tag,
1046 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001047 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001048 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1049 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001050 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001051
1052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def kill(self):
showard170873e2009-01-07 00:22:26 +00001054 if self.has_process():
1055 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001056
mbligh36768f02008-02-22 18:28:33 +00001057
showard170873e2009-01-07 00:22:26 +00001058 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001059 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001060 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001061
1062
showard170873e2009-01-07 00:22:26 +00001063 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001064 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001065 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001066 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001067
1068
showard170873e2009-01-07 00:22:26 +00001069 def _read_pidfile(self, use_second_read=False):
1070 assert self.pidfile_id is not None, (
1071 'You must call run() or attach_to_existing_process()')
1072 contents = _drone_manager.get_pidfile_contents(
1073 self.pidfile_id, use_second_read=use_second_read)
1074 if contents.is_invalid():
1075 self._state = drone_manager.PidfileContents()
1076 raise self._PidfileException(contents)
1077 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001078
1079
showard21baa452008-10-21 00:08:39 +00001080 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001081 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1082 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001083 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001084 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001085 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001086
1087
1088 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001089 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001090 return
mblighbb421852008-03-11 22:36:16 +00001091
showard21baa452008-10-21 00:08:39 +00001092 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001093
showard170873e2009-01-07 00:22:26 +00001094 if self._state.process is None:
1095 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001096 return
mbligh90a549d2008-03-25 23:52:34 +00001097
showard21baa452008-10-21 00:08:39 +00001098 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001099 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001100 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001101 return
mbligh90a549d2008-03-25 23:52:34 +00001102
showard170873e2009-01-07 00:22:26 +00001103 # pid but no running process - maybe process *just* exited
1104 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001105 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001106 # autoserv exited without writing an exit code
1107 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001108 self._handle_pidfile_error(
1109 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001110
showard21baa452008-10-21 00:08:39 +00001111
1112 def _get_pidfile_info(self):
1113 """\
1114 After completion, self._state will contain:
1115 pid=None, exit_status=None if autoserv has not yet run
1116 pid!=None, exit_status=None if autoserv is running
1117 pid!=None, exit_status!=None if autoserv has completed
1118 """
1119 try:
1120 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001121 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001122 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001123
1124
showard170873e2009-01-07 00:22:26 +00001125 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001126 """\
1127 Called when no pidfile is found or no pid is in the pidfile.
1128 """
showard170873e2009-01-07 00:22:26 +00001129 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001130 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001131 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1132 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001133 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001134 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001135
1136
showard35162b02009-03-03 02:17:30 +00001137 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001138 """\
1139 Called when autoserv has exited without writing an exit status,
1140 or we've timed out waiting for autoserv to write a pid to the
1141 pidfile. In either case, we just return failure and the caller
1142 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001143
showard170873e2009-01-07 00:22:26 +00001144 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001145 """
1146 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001147 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001148 self._state.exit_status = 1
1149 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001153 self._get_pidfile_info()
1154 return self._state.exit_status
1155
1156
1157 def num_tests_failed(self):
1158 self._get_pidfile_info()
1159 assert self._state.num_tests_failed is not None
1160 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001161
1162
mbligh36768f02008-02-22 18:28:33 +00001163class Agent(object):
showard170873e2009-01-07 00:22:26 +00001164 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001165 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001166 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001167 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001168 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001169
showard170873e2009-01-07 00:22:26 +00001170 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1171 for task in tasks)
1172 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1173
showardd3dc1992009-04-22 21:01:40 +00001174 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001175 for task in tasks:
1176 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001177
1178
showardd3dc1992009-04-22 21:01:40 +00001179 def _clear_queue(self):
1180 self.queue = Queue.Queue(0)
1181
1182
showard170873e2009-01-07 00:22:26 +00001183 def _union_ids(self, id_lists):
1184 return set(itertools.chain(*id_lists))
1185
1186
jadmanski0afbb632008-06-06 21:10:57 +00001187 def add_task(self, task):
1188 self.queue.put_nowait(task)
1189 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001190
1191
jadmanski0afbb632008-06-06 21:10:57 +00001192 def tick(self):
showard21baa452008-10-21 00:08:39 +00001193 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001194 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001195 self.active_task.poll()
1196 if not self.active_task.is_done():
1197 return
1198 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001199
1200
jadmanski0afbb632008-06-06 21:10:57 +00001201 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001202 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001203 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001204 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001205 if not self.active_task.success:
1206 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001207 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001208
jadmanski0afbb632008-06-06 21:10:57 +00001209 if not self.is_done():
1210 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001211
1212
jadmanski0afbb632008-06-06 21:10:57 +00001213 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001214 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001215 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1216 # get reset.
1217 new_agent = Agent(self.active_task.failure_tasks)
1218 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001219
mblighe2586682008-02-29 22:45:46 +00001220
showard4c5374f2008-09-04 17:02:56 +00001221 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001222 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001223
1224
jadmanski0afbb632008-06-06 21:10:57 +00001225 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001226 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001227
1228
showardd3dc1992009-04-22 21:01:40 +00001229 def abort(self):
showard08a36412009-05-05 01:01:13 +00001230 # abort tasks until the queue is empty or a task ignores the abort
1231 while not self.is_done():
1232 if not self.active_task:
1233 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001234 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001235 if not self.active_task.aborted:
1236 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001237 return
1238 self.active_task = None
1239
showardd3dc1992009-04-22 21:01:40 +00001240
mbligh36768f02008-02-22 18:28:33 +00001241class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001242 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1243 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001244 self.done = False
1245 self.failure_tasks = failure_tasks
1246 self.started = False
1247 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001248 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001249 self.task = None
1250 self.agent = None
1251 self.monitor = None
1252 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001253 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001254 self.queue_entry_ids = []
1255 self.host_ids = []
1256 self.log_file = None
1257
1258
1259 def _set_ids(self, host=None, queue_entries=None):
1260 if queue_entries and queue_entries != [None]:
1261 self.host_ids = [entry.host.id for entry in queue_entries]
1262 self.queue_entry_ids = [entry.id for entry in queue_entries]
1263 else:
1264 assert host
1265 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def poll(self):
showard08a36412009-05-05 01:01:13 +00001269 if not self.started:
1270 self.start()
1271 self.tick()
1272
1273
1274 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001275 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001276 exit_code = self.monitor.exit_code()
1277 if exit_code is None:
1278 return
1279 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001280 else:
1281 success = False
mbligh36768f02008-02-22 18:28:33 +00001282
jadmanski0afbb632008-06-06 21:10:57 +00001283 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001284
1285
jadmanski0afbb632008-06-06 21:10:57 +00001286 def is_done(self):
1287 return self.done
mbligh36768f02008-02-22 18:28:33 +00001288
1289
jadmanski0afbb632008-06-06 21:10:57 +00001290 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001291 if self.done:
1292 return
jadmanski0afbb632008-06-06 21:10:57 +00001293 self.done = True
1294 self.success = success
1295 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001296
1297
jadmanski0afbb632008-06-06 21:10:57 +00001298 def prolog(self):
1299 pass
mblighd64e5702008-04-04 21:39:28 +00001300
1301
jadmanski0afbb632008-06-06 21:10:57 +00001302 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001303 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001304
mbligh36768f02008-02-22 18:28:33 +00001305
jadmanski0afbb632008-06-06 21:10:57 +00001306 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001307 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001308 _drone_manager.copy_to_results_repository(
1309 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001310
1311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def epilog(self):
1313 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001314
1315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def start(self):
1317 assert self.agent
1318
1319 if not self.started:
1320 self.prolog()
1321 self.run()
1322
1323 self.started = True
1324
1325
1326 def abort(self):
1327 if self.monitor:
1328 self.monitor.kill()
1329 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001330 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.cleanup()
1332
1333
showard170873e2009-01-07 00:22:26 +00001334 def set_host_log_file(self, base_name, host):
1335 filename = '%s.%s' % (time.time(), base_name)
1336 self.log_file = os.path.join('hosts', host.hostname, filename)
1337
1338
showardde634ee2009-01-30 01:44:24 +00001339 def _get_consistent_execution_tag(self, queue_entries):
1340 first_execution_tag = queue_entries[0].execution_tag()
1341 for queue_entry in queue_entries[1:]:
1342 assert queue_entry.execution_tag() == first_execution_tag, (
1343 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1344 queue_entry,
1345 first_execution_tag,
1346 queue_entries[0]))
1347 return first_execution_tag
1348
1349
showard6b733412009-04-27 20:09:18 +00001350 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001351 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001352 if use_monitor is None:
1353 assert self.monitor
1354 use_monitor = self.monitor
1355 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001356 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001357 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001358 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001359 results_path)
showardde634ee2009-01-30 01:44:24 +00001360
1361 reparse_task = FinalReparseTask(queue_entries)
1362 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1363
1364
showardd3dc1992009-04-22 21:01:40 +00001365 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001366 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001367 self.monitor = PidfileRunMonitor()
1368 self.monitor.run(self.cmd, self._working_directory,
1369 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001370 log_file=self.log_file,
1371 pidfile_name=pidfile_name,
1372 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001373
1374
showardd9205182009-04-27 20:09:55 +00001375class TaskWithJobKeyvals(object):
1376 """AgentTask mixin providing functionality to help with job keyval files."""
1377 _KEYVAL_FILE = 'keyval'
1378 def _format_keyval(self, key, value):
1379 return '%s=%s' % (key, value)
1380
1381
1382 def _keyval_path(self):
1383 """Subclasses must override this"""
1384 raise NotImplemented
1385
1386
1387 def _write_keyval_after_job(self, field, value):
1388 assert self.monitor
1389 if not self.monitor.has_process():
1390 return
1391 _drone_manager.write_lines_to_file(
1392 self._keyval_path(), [self._format_keyval(field, value)],
1393 paired_with_process=self.monitor.get_process())
1394
1395
1396 def _job_queued_keyval(self, job):
1397 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1398
1399
1400 def _write_job_finished(self):
1401 self._write_keyval_after_job("job_finished", int(time.time()))
1402
1403
1404class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001405 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001406 """\
showard170873e2009-01-07 00:22:26 +00001407 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001408 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001409 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001410 # normalize the protection name
1411 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001412
jadmanski0afbb632008-06-06 21:10:57 +00001413 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001414 self.queue_entry_to_fail = queue_entry
1415 # *don't* include the queue entry in IDs -- if the queue entry is
1416 # aborted, we want to leave the repair task running
1417 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001418
1419 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001420 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1421 ['-R', '--host-protection', protection],
1422 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001423 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1424
showard170873e2009-01-07 00:22:26 +00001425 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001426
mbligh36768f02008-02-22 18:28:33 +00001427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001429 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001430 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001431 if self.queue_entry_to_fail:
1432 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001433
1434
showardd9205182009-04-27 20:09:55 +00001435 def _keyval_path(self):
1436 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1437
1438
showardde634ee2009-01-30 01:44:24 +00001439 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001440 assert self.queue_entry_to_fail
1441
1442 if self.queue_entry_to_fail.meta_host:
1443 return # don't fail metahost entries, they'll be reassigned
1444
1445 self.queue_entry_to_fail.update_from_database()
1446 if self.queue_entry_to_fail.status != 'Queued':
1447 return # entry has been aborted
1448
1449 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001450 queued_key, queued_time = self._job_queued_keyval(
1451 self.queue_entry_to_fail.job)
1452 self._write_keyval_after_job(queued_key, queued_time)
1453 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001454 # copy results logs into the normal place for job results
1455 _drone_manager.copy_results_on_drone(
1456 self.monitor.get_process(),
1457 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001458 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001459
showardccbd6c52009-03-21 00:10:21 +00001460 self._copy_and_parse_results([self.queue_entry_to_fail])
1461 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001462
1463
jadmanski0afbb632008-06-06 21:10:57 +00001464 def epilog(self):
1465 super(RepairTask, self).epilog()
1466 if self.success:
1467 self.host.set_status('Ready')
1468 else:
1469 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001470 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001471 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001472
1473
showard8fe93b52008-11-18 17:53:22 +00001474class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001475 def epilog(self):
1476 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001477 should_copy_results = (self.queue_entry and not self.success
1478 and not self.queue_entry.meta_host)
1479 if should_copy_results:
1480 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001481 destination = os.path.join(self.queue_entry.execution_tag(),
1482 os.path.basename(self.log_file))
1483 _drone_manager.copy_to_results_repository(
1484 self.monitor.get_process(), self.log_file,
1485 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001486
1487
1488class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001489 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001490 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001491 self.host = host or queue_entry.host
1492 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001493
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001495 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1496 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001497 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001498 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1499 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001500
showard170873e2009-01-07 00:22:26 +00001501 self.set_host_log_file('verify', self.host)
1502 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001503
1504
jadmanski0afbb632008-06-06 21:10:57 +00001505 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001506 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001507 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001508 if self.queue_entry:
1509 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001510 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def epilog(self):
1514 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001515
jadmanski0afbb632008-06-06 21:10:57 +00001516 if self.success:
1517 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001518
1519
showardd9205182009-04-27 20:09:55 +00001520class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001521 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.job = job
1523 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001524 super(QueueTask, self).__init__(cmd, self._execution_tag())
1525 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001526
1527
showard73ec0442009-02-07 02:05:20 +00001528 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001529 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001530
1531
1532 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1533 keyval_contents = '\n'.join(self._format_keyval(key, value)
1534 for key, value in keyval_dict.iteritems())
1535 # always end with a newline to allow additional keyvals to be written
1536 keyval_contents += '\n'
1537 _drone_manager.attach_file_to_execution(self._execution_tag(),
1538 keyval_contents,
1539 file_path=keyval_path)
1540
1541
1542 def _write_keyvals_before_job(self, keyval_dict):
1543 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1544
1545
showard170873e2009-01-07 00:22:26 +00001546 def _write_host_keyvals(self, host):
1547 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1548 host.hostname)
1549 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001550 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1551 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001552
1553
showard170873e2009-01-07 00:22:26 +00001554 def _execution_tag(self):
1555 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001556
1557
jadmanski0afbb632008-06-06 21:10:57 +00001558 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001559 queued_key, queued_time = self._job_queued_keyval(self.job)
1560 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001561 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001562 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001563 queue_entry.set_status('Running')
1564 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001565 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001566 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001567 assert len(self.queue_entries) == 1
1568 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001569
1570
showard35162b02009-03-03 02:17:30 +00001571 def _write_lost_process_error_file(self):
1572 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1573 _drone_manager.write_lines_to_file(error_file_path,
1574 [_LOST_PROCESS_ERROR])
1575
1576
showardd3dc1992009-04-22 21:01:40 +00001577 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001578 if not self.monitor:
1579 return
1580
showardd9205182009-04-27 20:09:55 +00001581 self._write_job_finished()
1582
showardd3dc1992009-04-22 21:01:40 +00001583 # both of these conditionals can be true, iff the process ran, wrote a
1584 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001585 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001586 gather_task = GatherLogsTask(self.job, self.queue_entries)
1587 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001588
1589 if self.monitor.lost_process:
1590 self._write_lost_process_error_file()
1591 for queue_entry in self.queue_entries:
1592 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001593
1594
showardcbd74612008-11-19 21:42:02 +00001595 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001596 _drone_manager.write_lines_to_file(
1597 os.path.join(self._execution_tag(), 'status.log'),
1598 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001599 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001600
1601
jadmanskif7fa2cc2008-10-01 14:13:23 +00001602 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001603 if not self.monitor or not self.monitor.has_process():
1604 return
1605
jadmanskif7fa2cc2008-10-01 14:13:23 +00001606 # build up sets of all the aborted_by and aborted_on values
1607 aborted_by, aborted_on = set(), set()
1608 for queue_entry in self.queue_entries:
1609 if queue_entry.aborted_by:
1610 aborted_by.add(queue_entry.aborted_by)
1611 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1612 aborted_on.add(t)
1613
1614 # extract some actual, unique aborted by value and write it out
1615 assert len(aborted_by) <= 1
1616 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001617 aborted_by_value = aborted_by.pop()
1618 aborted_on_value = max(aborted_on)
1619 else:
1620 aborted_by_value = 'autotest_system'
1621 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001622
showarda0382352009-02-11 23:36:43 +00001623 self._write_keyval_after_job("aborted_by", aborted_by_value)
1624 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001625
showardcbd74612008-11-19 21:42:02 +00001626 aborted_on_string = str(datetime.datetime.fromtimestamp(
1627 aborted_on_value))
1628 self._write_status_comment('Job aborted by %s on %s' %
1629 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001630
1631
jadmanski0afbb632008-06-06 21:10:57 +00001632 def abort(self):
1633 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001634 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001635 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001636
1637
jadmanski0afbb632008-06-06 21:10:57 +00001638 def epilog(self):
1639 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001640 self._finish_task()
1641 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001642
1643
mblighbb421852008-03-11 22:36:16 +00001644class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001645 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001646 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001647 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001648
1649
jadmanski0afbb632008-06-06 21:10:57 +00001650 def run(self):
1651 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001652
1653
jadmanski0afbb632008-06-06 21:10:57 +00001654 def prolog(self):
1655 # recovering an existing process - don't do prolog
1656 pass
mblighbb421852008-03-11 22:36:16 +00001657
1658
showardd3dc1992009-04-22 21:01:40 +00001659class PostJobTask(AgentTask):
1660 def __init__(self, queue_entries, pidfile_name, logfile_name,
1661 run_monitor=None):
1662 """
1663 If run_monitor != None, we're recovering a running task.
1664 """
1665 self._queue_entries = queue_entries
1666 self._pidfile_name = pidfile_name
1667 self._run_monitor = run_monitor
1668
1669 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1670 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1671 self._autoserv_monitor = PidfileRunMonitor()
1672 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1673 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1674
1675 if _testing_mode:
1676 command = 'true'
1677 else:
1678 command = self._generate_command(self._results_dir)
1679
1680 super(PostJobTask, self).__init__(cmd=command,
1681 working_directory=self._execution_tag)
1682
1683 self.log_file = os.path.join(self._execution_tag, logfile_name)
1684 self._final_status = self._determine_final_status()
1685
1686
1687 def _generate_command(self, results_dir):
1688 raise NotImplementedError('Subclasses must override this')
1689
1690
1691 def _job_was_aborted(self):
1692 was_aborted = None
1693 for queue_entry in self._queue_entries:
1694 queue_entry.update_from_database()
1695 if was_aborted is None: # first queue entry
1696 was_aborted = bool(queue_entry.aborted)
1697 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1698 email_manager.manager.enqueue_notify_email(
1699 'Inconsistent abort state',
1700 'Queue entries have inconsistent abort state: ' +
1701 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1702 # don't crash here, just assume true
1703 return True
1704 return was_aborted
1705
1706
1707 def _determine_final_status(self):
1708 if self._job_was_aborted():
1709 return models.HostQueueEntry.Status.ABORTED
1710
1711 # we'll use a PidfileRunMonitor to read the autoserv exit status
1712 if self._autoserv_monitor.exit_code() == 0:
1713 return models.HostQueueEntry.Status.COMPLETED
1714 return models.HostQueueEntry.Status.FAILED
1715
1716
1717 def run(self):
1718 if self._run_monitor is not None:
1719 self.monitor = self._run_monitor
1720 else:
1721 # make sure we actually have results to work with.
1722 # this should never happen in normal operation.
1723 if not self._autoserv_monitor.has_process():
1724 email_manager.manager.enqueue_notify_email(
1725 'No results in post-job task',
1726 'No results in post-job task at %s' %
1727 self._autoserv_monitor.pidfile_id)
1728 self.finished(False)
1729 return
1730
1731 super(PostJobTask, self).run(
1732 pidfile_name=self._pidfile_name,
1733 paired_with_pidfile=self._paired_with_pidfile)
1734
1735
1736 def _set_all_statuses(self, status):
1737 for queue_entry in self._queue_entries:
1738 queue_entry.set_status(status)
1739
1740
1741 def abort(self):
1742 # override AgentTask.abort() to avoid killing the process and ending
1743 # the task. post-job tasks continue when the job is aborted.
1744 pass
1745
1746
1747class GatherLogsTask(PostJobTask):
1748 """
1749 Task responsible for
1750 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1751 * copying logs to the results repository
1752 * spawning CleanupTasks for hosts, if necessary
1753 * spawning a FinalReparseTask for the job
1754 """
1755 def __init__(self, job, queue_entries, run_monitor=None):
1756 self._job = job
1757 super(GatherLogsTask, self).__init__(
1758 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1759 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1760 self._set_ids(queue_entries=queue_entries)
1761
1762
1763 def _generate_command(self, results_dir):
1764 host_list = ','.join(queue_entry.host.hostname
1765 for queue_entry in self._queue_entries)
1766 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1767 '-r', results_dir]
1768
1769
1770 def prolog(self):
1771 super(GatherLogsTask, self).prolog()
1772 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1773
1774
1775 def _reboot_hosts(self):
1776 reboot_after = self._job.reboot_after
1777 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001778 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1779 do_reboot = True
1780 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001781 do_reboot = True
1782 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1783 final_success = (
1784 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1785 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1786 do_reboot = (final_success and num_tests_failed == 0)
1787
1788 for queue_entry in self._queue_entries:
1789 if do_reboot:
1790 # don't pass the queue entry to the CleanupTask. if the cleanup
1791 # fails, the job doesn't care -- it's over.
1792 cleanup_task = CleanupTask(host=queue_entry.host)
1793 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1794 else:
1795 queue_entry.host.set_status('Ready')
1796
1797
1798 def epilog(self):
1799 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001800 self._copy_and_parse_results(self._queue_entries,
1801 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001802 self._reboot_hosts()
1803
1804
showard0bbfc212009-04-29 21:06:13 +00001805 def run(self):
1806 if self._final_status == models.HostQueueEntry.Status.COMPLETED:
1807 # don't run at all if Autoserv exited successfully
1808 self.finished(True)
1809 else:
1810 super(GatherLogsTask, self).run()
1811
1812
showard8fe93b52008-11-18 17:53:22 +00001813class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001814 def __init__(self, host=None, queue_entry=None):
1815 assert bool(host) ^ bool(queue_entry)
1816 if queue_entry:
1817 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001818 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001819 self.host = host
showard170873e2009-01-07 00:22:26 +00001820
1821 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001822 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1823 ['--cleanup'],
1824 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001825 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001826 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1827 failure_tasks=[repair_task])
1828
1829 self._set_ids(host=host, queue_entries=[queue_entry])
1830 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001831
mblighd5c95802008-03-05 00:33:46 +00001832
jadmanski0afbb632008-06-06 21:10:57 +00001833 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001834 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001835 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001836 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001837
mblighd5c95802008-03-05 00:33:46 +00001838
showard21baa452008-10-21 00:08:39 +00001839 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001840 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001841 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001842 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001843 self.host.update_field('dirty', 0)
1844
1845
showardd3dc1992009-04-22 21:01:40 +00001846class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001847 _num_running_parses = 0
1848
showardd3dc1992009-04-22 21:01:40 +00001849 def __init__(self, queue_entries, run_monitor=None):
1850 super(FinalReparseTask, self).__init__(queue_entries,
1851 pidfile_name=_PARSER_PID_FILE,
1852 logfile_name='.parse.log',
1853 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001854 # don't use _set_ids, since we don't want to set the host_ids
1855 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001856 self._parse_started = False
1857
showard97aed502008-11-04 02:01:24 +00001858
1859 @classmethod
1860 def _increment_running_parses(cls):
1861 cls._num_running_parses += 1
1862
1863
1864 @classmethod
1865 def _decrement_running_parses(cls):
1866 cls._num_running_parses -= 1
1867
1868
1869 @classmethod
1870 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001871 return (cls._num_running_parses <
1872 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001873
1874
1875 def prolog(self):
1876 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001877 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001878
1879
1880 def epilog(self):
1881 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001882 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001883
1884
showardd3dc1992009-04-22 21:01:40 +00001885 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001886 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001887 results_dir]
showard97aed502008-11-04 02:01:24 +00001888
1889
showard08a36412009-05-05 01:01:13 +00001890 def tick(self):
1891 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001892 # and we can, at which point we revert to default behavior
1893 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001894 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001895 else:
1896 self._try_starting_parse()
1897
1898
1899 def run(self):
1900 # override run() to not actually run unless we can
1901 self._try_starting_parse()
1902
1903
1904 def _try_starting_parse(self):
1905 if not self._can_run_new_parse():
1906 return
showard170873e2009-01-07 00:22:26 +00001907
showard97aed502008-11-04 02:01:24 +00001908 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001909 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001910
showard97aed502008-11-04 02:01:24 +00001911 self._increment_running_parses()
1912 self._parse_started = True
1913
1914
1915 def finished(self, success):
1916 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001917 if self._parse_started:
1918 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001919
1920
showardc9ae1782009-01-30 01:42:37 +00001921class SetEntryPendingTask(AgentTask):
1922 def __init__(self, queue_entry):
1923 super(SetEntryPendingTask, self).__init__(cmd='')
1924 self._queue_entry = queue_entry
1925 self._set_ids(queue_entries=[queue_entry])
1926
1927
1928 def run(self):
1929 agent = self._queue_entry.on_pending()
1930 if agent:
1931 self.agent.dispatcher.add_agent(agent)
1932 self.finished(True)
1933
1934
showarda3c58572009-03-12 20:36:59 +00001935class DBError(Exception):
1936 """Raised by the DBObject constructor when its select fails."""
1937
1938
mbligh36768f02008-02-22 18:28:33 +00001939class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001940 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001941
1942 # Subclasses MUST override these:
1943 _table_name = ''
1944 _fields = ()
1945
showarda3c58572009-03-12 20:36:59 +00001946 # A mapping from (type, id) to the instance of the object for that
1947 # particular id. This prevents us from creating new Job() and Host()
1948 # instances for every HostQueueEntry object that we instantiate as
1949 # multiple HQEs often share the same Job.
1950 _instances_by_type_and_id = weakref.WeakValueDictionary()
1951 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001952
showarda3c58572009-03-12 20:36:59 +00001953
1954 def __new__(cls, id=None, **kwargs):
1955 """
1956 Look to see if we already have an instance for this particular type
1957 and id. If so, use it instead of creating a duplicate instance.
1958 """
1959 if id is not None:
1960 instance = cls._instances_by_type_and_id.get((cls, id))
1961 if instance:
1962 return instance
1963 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1964
1965
1966 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001967 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001968 assert self._table_name, '_table_name must be defined in your class'
1969 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001970 if not new_record:
1971 if self._initialized and not always_query:
1972 return # We've already been initialized.
1973 if id is None:
1974 id = row[0]
1975 # Tell future constructors to use us instead of re-querying while
1976 # this instance is still around.
1977 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001978
showard6ae5ea92009-02-25 00:11:51 +00001979 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001980
jadmanski0afbb632008-06-06 21:10:57 +00001981 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001982
jadmanski0afbb632008-06-06 21:10:57 +00001983 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001984 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001985
showarda3c58572009-03-12 20:36:59 +00001986 if self._initialized:
1987 differences = self._compare_fields_in_row(row)
1988 if differences:
showard7629f142009-03-27 21:02:02 +00001989 logging.warn(
1990 'initialized %s %s instance requery is updating: %s',
1991 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001992 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001993 self._initialized = True
1994
1995
1996 @classmethod
1997 def _clear_instance_cache(cls):
1998 """Used for testing, clear the internal instance cache."""
1999 cls._instances_by_type_and_id.clear()
2000
2001
showardccbd6c52009-03-21 00:10:21 +00002002 def _fetch_row_from_db(self, row_id):
2003 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2004 rows = _db.execute(sql, (row_id,))
2005 if not rows:
showard76e29d12009-04-15 21:53:10 +00002006 raise DBError("row not found (table=%s, row id=%s)"
2007 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002008 return rows[0]
2009
2010
showarda3c58572009-03-12 20:36:59 +00002011 def _assert_row_length(self, row):
2012 assert len(row) == len(self._fields), (
2013 "table = %s, row = %s/%d, fields = %s/%d" % (
2014 self.__table, row, len(row), self._fields, len(self._fields)))
2015
2016
2017 def _compare_fields_in_row(self, row):
2018 """
2019 Given a row as returned by a SELECT query, compare it to our existing
2020 in memory fields.
2021
2022 @param row - A sequence of values corresponding to fields named in
2023 The class attribute _fields.
2024
2025 @returns A dictionary listing the differences keyed by field name
2026 containing tuples of (current_value, row_value).
2027 """
2028 self._assert_row_length(row)
2029 differences = {}
2030 for field, row_value in itertools.izip(self._fields, row):
2031 current_value = getattr(self, field)
2032 if current_value != row_value:
2033 differences[field] = (current_value, row_value)
2034 return differences
showard2bab8f42008-11-12 18:15:22 +00002035
2036
2037 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002038 """
2039 Update our field attributes using a single row returned by SELECT.
2040
2041 @param row - A sequence of values corresponding to fields named in
2042 the class fields list.
2043 """
2044 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002045
showard2bab8f42008-11-12 18:15:22 +00002046 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002047 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002048 setattr(self, field, value)
2049 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002050
showard2bab8f42008-11-12 18:15:22 +00002051 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002052
mblighe2586682008-02-29 22:45:46 +00002053
showardccbd6c52009-03-21 00:10:21 +00002054 def update_from_database(self):
2055 assert self.id is not None
2056 row = self._fetch_row_from_db(self.id)
2057 self._update_fields_from_row(row)
2058
2059
jadmanski0afbb632008-06-06 21:10:57 +00002060 def count(self, where, table = None):
2061 if not table:
2062 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002063
jadmanski0afbb632008-06-06 21:10:57 +00002064 rows = _db.execute("""
2065 SELECT count(*) FROM %s
2066 WHERE %s
2067 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002068
jadmanski0afbb632008-06-06 21:10:57 +00002069 assert len(rows) == 1
2070
2071 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002072
2073
showardd3dc1992009-04-22 21:01:40 +00002074 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002075 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002076
showard2bab8f42008-11-12 18:15:22 +00002077 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002078 return
mbligh36768f02008-02-22 18:28:33 +00002079
mblighf8c624d2008-07-03 16:58:45 +00002080 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002081 _db.execute(query, (value, self.id))
2082
showard2bab8f42008-11-12 18:15:22 +00002083 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002084
2085
jadmanski0afbb632008-06-06 21:10:57 +00002086 def save(self):
2087 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002088 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002089 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002090 values = []
2091 for key in keys:
2092 value = getattr(self, key)
2093 if value is None:
2094 values.append('NULL')
2095 else:
2096 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002097 values_str = ','.join(values)
2098 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2099 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002100 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002101 # Update our id to the one the database just assigned to us.
2102 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002103
2104
jadmanski0afbb632008-06-06 21:10:57 +00002105 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002106 self._instances_by_type_and_id.pop((type(self), id), None)
2107 self._initialized = False
2108 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002109 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2110 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002111
2112
showard63a34772008-08-18 19:32:50 +00002113 @staticmethod
2114 def _prefix_with(string, prefix):
2115 if string:
2116 string = prefix + string
2117 return string
2118
2119
jadmanski0afbb632008-06-06 21:10:57 +00002120 @classmethod
showard989f25d2008-10-01 11:38:11 +00002121 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002122 """
2123 Construct instances of our class based on the given database query.
2124
2125 @yields One class instance for each row fetched.
2126 """
showard63a34772008-08-18 19:32:50 +00002127 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2128 where = cls._prefix_with(where, 'WHERE ')
2129 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002130 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002131 'joins' : joins,
2132 'where' : where,
2133 'order_by' : order_by})
2134 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002135 for row in rows:
2136 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002137
mbligh36768f02008-02-22 18:28:33 +00002138
2139class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002140 _table_name = 'ineligible_host_queues'
2141 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002142
2143
showard89f84db2009-03-12 20:39:13 +00002144class AtomicGroup(DBObject):
2145 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002146 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2147 'invalid')
showard89f84db2009-03-12 20:39:13 +00002148
2149
showard989f25d2008-10-01 11:38:11 +00002150class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002151 _table_name = 'labels'
2152 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002153 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002154
2155
mbligh36768f02008-02-22 18:28:33 +00002156class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002157 _table_name = 'hosts'
2158 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2159 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2160
2161
jadmanski0afbb632008-06-06 21:10:57 +00002162 def current_task(self):
2163 rows = _db.execute("""
2164 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2165 """, (self.id,))
2166
2167 if len(rows) == 0:
2168 return None
2169 else:
2170 assert len(rows) == 1
2171 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002172 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002173
2174
jadmanski0afbb632008-06-06 21:10:57 +00002175 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002176 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002177 if self.current_task():
2178 self.current_task().requeue()
2179
showard6ae5ea92009-02-25 00:11:51 +00002180
jadmanski0afbb632008-06-06 21:10:57 +00002181 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002182 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002183 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002184
2185
showard170873e2009-01-07 00:22:26 +00002186 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002187 """
showard170873e2009-01-07 00:22:26 +00002188 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002189 """
2190 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002191 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002192 FROM labels
2193 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002194 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002195 ORDER BY labels.name
2196 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002197 platform = None
2198 all_labels = []
2199 for label_name, is_platform in rows:
2200 if is_platform:
2201 platform = label_name
2202 all_labels.append(label_name)
2203 return platform, all_labels
2204
2205
2206 def reverify_tasks(self):
2207 cleanup_task = CleanupTask(host=self)
2208 verify_task = VerifyTask(host=self)
2209 # just to make sure this host does not get taken away
2210 self.set_status('Cleaning')
2211 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002212
2213
mbligh36768f02008-02-22 18:28:33 +00002214class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002215 _table_name = 'host_queue_entries'
2216 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002217 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002218 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002219
2220
showarda3c58572009-03-12 20:36:59 +00002221 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002222 assert id or row
showarda3c58572009-03-12 20:36:59 +00002223 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002224 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002225
jadmanski0afbb632008-06-06 21:10:57 +00002226 if self.host_id:
2227 self.host = Host(self.host_id)
2228 else:
2229 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002230
showard170873e2009-01-07 00:22:26 +00002231 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002232 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002233
2234
showard89f84db2009-03-12 20:39:13 +00002235 @classmethod
2236 def clone(cls, template):
2237 """
2238 Creates a new row using the values from a template instance.
2239
2240 The new instance will not exist in the database or have a valid
2241 id attribute until its save() method is called.
2242 """
2243 assert isinstance(template, cls)
2244 new_row = [getattr(template, field) for field in cls._fields]
2245 clone = cls(row=new_row, new_record=True)
2246 clone.id = None
2247 return clone
2248
2249
showardc85c21b2008-11-24 22:17:37 +00002250 def _view_job_url(self):
2251 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2252
2253
jadmanski0afbb632008-06-06 21:10:57 +00002254 def set_host(self, host):
2255 if host:
2256 self.queue_log_record('Assigning host ' + host.hostname)
2257 self.update_field('host_id', host.id)
2258 self.update_field('active', True)
2259 self.block_host(host.id)
2260 else:
2261 self.queue_log_record('Releasing host')
2262 self.unblock_host(self.host.id)
2263 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002264
jadmanski0afbb632008-06-06 21:10:57 +00002265 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002266
2267
jadmanski0afbb632008-06-06 21:10:57 +00002268 def get_host(self):
2269 return self.host
mbligh36768f02008-02-22 18:28:33 +00002270
2271
jadmanski0afbb632008-06-06 21:10:57 +00002272 def queue_log_record(self, log_line):
2273 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002274 _drone_manager.write_lines_to_file(self.queue_log_path,
2275 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002276
2277
jadmanski0afbb632008-06-06 21:10:57 +00002278 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002279 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002280 row = [0, self.job.id, host_id]
2281 block = IneligibleHostQueue(row=row, new_record=True)
2282 block.save()
mblighe2586682008-02-29 22:45:46 +00002283
2284
jadmanski0afbb632008-06-06 21:10:57 +00002285 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002286 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002287 blocks = IneligibleHostQueue.fetch(
2288 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2289 for block in blocks:
2290 block.delete()
mblighe2586682008-02-29 22:45:46 +00002291
2292
showard2bab8f42008-11-12 18:15:22 +00002293 def set_execution_subdir(self, subdir=None):
2294 if subdir is None:
2295 assert self.get_host()
2296 subdir = self.get_host().hostname
2297 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002298
2299
showard6355f6b2008-12-05 18:52:13 +00002300 def _get_hostname(self):
2301 if self.host:
2302 return self.host.hostname
2303 return 'no host'
2304
2305
showard170873e2009-01-07 00:22:26 +00002306 def __str__(self):
2307 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2308
2309
jadmanski0afbb632008-06-06 21:10:57 +00002310 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002311 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002312
showardb18134f2009-03-20 20:52:18 +00002313 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002314
showardc85c21b2008-11-24 22:17:37 +00002315 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002316 self.update_field('complete', False)
2317 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002318
jadmanski0afbb632008-06-06 21:10:57 +00002319 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002320 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002321 self.update_field('complete', False)
2322 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002323
showardc85c21b2008-11-24 22:17:37 +00002324 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002325 self.update_field('complete', True)
2326 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002327
2328 should_email_status = (status.lower() in _notify_email_statuses or
2329 'all' in _notify_email_statuses)
2330 if should_email_status:
2331 self._email_on_status(status)
2332
2333 self._email_on_job_complete()
2334
2335
2336 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002337 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002338
2339 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2340 self.job.id, self.job.name, hostname, status)
2341 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2342 self.job.id, self.job.name, hostname, status,
2343 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002344 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002345
2346
2347 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002348 if not self.job.is_finished():
2349 return
showard542e8402008-09-19 20:16:18 +00002350
showardc85c21b2008-11-24 22:17:37 +00002351 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002352 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002353 for queue_entry in hosts_queue:
2354 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002355 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002356 queue_entry.status))
2357
2358 summary_text = "\n".join(summary_text)
2359 status_counts = models.Job.objects.get_status_counts(
2360 [self.job.id])[self.job.id]
2361 status = ', '.join('%d %s' % (count, status) for status, count
2362 in status_counts.iteritems())
2363
2364 subject = 'Autotest: Job ID: %s "%s" %s' % (
2365 self.job.id, self.job.name, status)
2366 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2367 self.job.id, self.job.name, status, self._view_job_url(),
2368 summary_text)
showard170873e2009-01-07 00:22:26 +00002369 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002370
2371
showard89f84db2009-03-12 20:39:13 +00002372 def run(self, assigned_host=None):
2373 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002374 assert assigned_host
2375 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002376 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002377
showardb18134f2009-03-20 20:52:18 +00002378 logging.info("%s/%s/%s scheduled on %s, status=%s",
2379 self.job.name, self.meta_host, self.atomic_group_id,
2380 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002381
jadmanski0afbb632008-06-06 21:10:57 +00002382 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002383
showard6ae5ea92009-02-25 00:11:51 +00002384
jadmanski0afbb632008-06-06 21:10:57 +00002385 def requeue(self):
2386 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002387 # verify/cleanup failure sets the execution subdir, so reset it here
2388 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002389 if self.meta_host:
2390 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002391
2392
jadmanski0afbb632008-06-06 21:10:57 +00002393 def handle_host_failure(self):
2394 """\
2395 Called when this queue entry's host has failed verification and
2396 repair.
2397 """
2398 assert not self.meta_host
2399 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002400 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002401
2402
jadmanskif7fa2cc2008-10-01 14:13:23 +00002403 @property
2404 def aborted_by(self):
2405 self._load_abort_info()
2406 return self._aborted_by
2407
2408
2409 @property
2410 def aborted_on(self):
2411 self._load_abort_info()
2412 return self._aborted_on
2413
2414
2415 def _load_abort_info(self):
2416 """ Fetch info about who aborted the job. """
2417 if hasattr(self, "_aborted_by"):
2418 return
2419 rows = _db.execute("""
2420 SELECT users.login, aborted_host_queue_entries.aborted_on
2421 FROM aborted_host_queue_entries
2422 INNER JOIN users
2423 ON users.id = aborted_host_queue_entries.aborted_by_id
2424 WHERE aborted_host_queue_entries.queue_entry_id = %s
2425 """, (self.id,))
2426 if rows:
2427 self._aborted_by, self._aborted_on = rows[0]
2428 else:
2429 self._aborted_by = self._aborted_on = None
2430
2431
showardb2e2c322008-10-14 17:33:55 +00002432 def on_pending(self):
2433 """
2434 Called when an entry in a synchronous job has passed verify. If the
2435 job is ready to run, returns an agent to run the job. Returns None
2436 otherwise.
2437 """
2438 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002439 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002440 if self.job.is_ready():
2441 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002442 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002443 return None
2444
2445
showardd3dc1992009-04-22 21:01:40 +00002446 def abort(self, dispatcher):
2447 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002448
showardd3dc1992009-04-22 21:01:40 +00002449 Status = models.HostQueueEntry.Status
2450 has_running_job_agent = (
2451 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2452 and dispatcher.get_agents_for_entry(self))
2453 if has_running_job_agent:
2454 # do nothing; post-job tasks will finish and then mark this entry
2455 # with status "Aborted" and take care of the host
2456 return
2457
2458 if self.status in (Status.STARTING, Status.PENDING):
2459 self.host.set_status(models.Host.Status.READY)
2460 elif self.status == Status.VERIFYING:
2461 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2462
2463 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002464
2465 def execution_tag(self):
2466 assert self.execution_subdir
2467 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002468
2469
mbligh36768f02008-02-22 18:28:33 +00002470class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002471 _table_name = 'jobs'
2472 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2473 'control_type', 'created_on', 'synch_count', 'timeout',
2474 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2475
2476
showarda3c58572009-03-12 20:36:59 +00002477 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002478 assert id or row
showarda3c58572009-03-12 20:36:59 +00002479 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002480
mblighe2586682008-02-29 22:45:46 +00002481
jadmanski0afbb632008-06-06 21:10:57 +00002482 def is_server_job(self):
2483 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002484
2485
showard170873e2009-01-07 00:22:26 +00002486 def tag(self):
2487 return "%s-%s" % (self.id, self.owner)
2488
2489
jadmanski0afbb632008-06-06 21:10:57 +00002490 def get_host_queue_entries(self):
2491 rows = _db.execute("""
2492 SELECT * FROM host_queue_entries
2493 WHERE job_id= %s
2494 """, (self.id,))
2495 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002496
jadmanski0afbb632008-06-06 21:10:57 +00002497 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002498
jadmanski0afbb632008-06-06 21:10:57 +00002499 return entries
mbligh36768f02008-02-22 18:28:33 +00002500
2501
jadmanski0afbb632008-06-06 21:10:57 +00002502 def set_status(self, status, update_queues=False):
2503 self.update_field('status',status)
2504
2505 if update_queues:
2506 for queue_entry in self.get_host_queue_entries():
2507 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002508
2509
jadmanski0afbb632008-06-06 21:10:57 +00002510 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002511 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2512 status='Pending')
2513 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002514
2515
jadmanski0afbb632008-06-06 21:10:57 +00002516 def num_machines(self, clause = None):
2517 sql = "job_id=%s" % self.id
2518 if clause:
2519 sql += " AND (%s)" % clause
2520 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002521
2522
jadmanski0afbb632008-06-06 21:10:57 +00002523 def num_queued(self):
2524 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002525
2526
jadmanski0afbb632008-06-06 21:10:57 +00002527 def num_active(self):
2528 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002529
2530
jadmanski0afbb632008-06-06 21:10:57 +00002531 def num_complete(self):
2532 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002533
2534
jadmanski0afbb632008-06-06 21:10:57 +00002535 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002536 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002537
mbligh36768f02008-02-22 18:28:33 +00002538
showard6bb7c292009-01-30 01:44:51 +00002539 def _not_yet_run_entries(self, include_verifying=True):
2540 statuses = [models.HostQueueEntry.Status.QUEUED,
2541 models.HostQueueEntry.Status.PENDING]
2542 if include_verifying:
2543 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2544 return models.HostQueueEntry.objects.filter(job=self.id,
2545 status__in=statuses)
2546
2547
2548 def _stop_all_entries(self):
2549 entries_to_stop = self._not_yet_run_entries(
2550 include_verifying=False)
2551 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002552 assert not child_entry.complete, (
2553 '%s status=%s, active=%s, complete=%s' %
2554 (child_entry.id, child_entry.status, child_entry.active,
2555 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002556 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2557 child_entry.host.status = models.Host.Status.READY
2558 child_entry.host.save()
2559 child_entry.status = models.HostQueueEntry.Status.STOPPED
2560 child_entry.save()
2561
showard2bab8f42008-11-12 18:15:22 +00002562 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002563 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002564 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002565 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002566
2567
jadmanski0afbb632008-06-06 21:10:57 +00002568 def write_to_machines_file(self, queue_entry):
2569 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002570 file_path = os.path.join(self.tag(), '.machines')
2571 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002572
2573
showard2bab8f42008-11-12 18:15:22 +00002574 def _next_group_name(self):
2575 query = models.HostQueueEntry.objects.filter(
2576 job=self.id).values('execution_subdir').distinct()
2577 subdirs = (entry['execution_subdir'] for entry in query)
2578 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2579 ids = [int(match.group(1)) for match in groups if match]
2580 if ids:
2581 next_id = max(ids) + 1
2582 else:
2583 next_id = 0
2584 return "group%d" % next_id
2585
2586
showard170873e2009-01-07 00:22:26 +00002587 def _write_control_file(self, execution_tag):
2588 control_path = _drone_manager.attach_file_to_execution(
2589 execution_tag, self.control_file)
2590 return control_path
mbligh36768f02008-02-22 18:28:33 +00002591
showardb2e2c322008-10-14 17:33:55 +00002592
showard2bab8f42008-11-12 18:15:22 +00002593 def get_group_entries(self, queue_entry_from_group):
2594 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002595 return list(HostQueueEntry.fetch(
2596 where='job_id=%s AND execution_subdir=%s',
2597 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002598
2599
showardb2e2c322008-10-14 17:33:55 +00002600 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002601 assert queue_entries
2602 execution_tag = queue_entries[0].execution_tag()
2603 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002604 hostnames = ','.join([entry.get_host().hostname
2605 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002606
showard87ba02a2009-04-20 19:37:32 +00002607 params = _autoserv_command_line(
2608 hostnames, execution_tag,
2609 ['-P', execution_tag, '-n',
2610 _drone_manager.absolute_path(control_path)],
2611 job=self)
mbligh36768f02008-02-22 18:28:33 +00002612
jadmanski0afbb632008-06-06 21:10:57 +00002613 if not self.is_server_job():
2614 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002615
showardb2e2c322008-10-14 17:33:55 +00002616 return params
mblighe2586682008-02-29 22:45:46 +00002617
mbligh36768f02008-02-22 18:28:33 +00002618
showardc9ae1782009-01-30 01:42:37 +00002619 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002620 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002621 return True
showard0fc38302008-10-23 00:44:07 +00002622 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002623 return queue_entry.get_host().dirty
2624 return False
showard21baa452008-10-21 00:08:39 +00002625
showardc9ae1782009-01-30 01:42:37 +00002626
2627 def _should_run_verify(self, queue_entry):
2628 do_not_verify = (queue_entry.host.protection ==
2629 host_protections.Protection.DO_NOT_VERIFY)
2630 if do_not_verify:
2631 return False
2632 return self.run_verify
2633
2634
2635 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002636 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002637 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002638 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002639 if self._should_run_verify(queue_entry):
2640 tasks.append(VerifyTask(queue_entry=queue_entry))
2641 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002642 return tasks
2643
2644
showard2bab8f42008-11-12 18:15:22 +00002645 def _assign_new_group(self, queue_entries):
2646 if len(queue_entries) == 1:
2647 group_name = queue_entries[0].get_host().hostname
2648 else:
2649 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002650 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002651 self.id, [entry.host.hostname for entry in queue_entries],
2652 group_name)
2653
2654 for queue_entry in queue_entries:
2655 queue_entry.set_execution_subdir(group_name)
2656
2657
2658 def _choose_group_to_run(self, include_queue_entry):
2659 chosen_entries = [include_queue_entry]
2660
2661 num_entries_needed = self.synch_count - 1
2662 if num_entries_needed > 0:
2663 pending_entries = HostQueueEntry.fetch(
2664 where='job_id = %s AND status = "Pending" AND id != %s',
2665 params=(self.id, include_queue_entry.id))
2666 chosen_entries += list(pending_entries)[:num_entries_needed]
2667
2668 self._assign_new_group(chosen_entries)
2669 return chosen_entries
2670
2671
2672 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002673 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002674 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2675 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002676
showard2bab8f42008-11-12 18:15:22 +00002677 queue_entries = self._choose_group_to_run(queue_entry)
2678 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002679
2680
2681 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002682 for queue_entry in queue_entries:
2683 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002684 params = self._get_autoserv_params(queue_entries)
2685 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2686 cmd=params)
2687 tasks = initial_tasks + [queue_task]
2688 entry_ids = [entry.id for entry in queue_entries]
2689
showard170873e2009-01-07 00:22:26 +00002690 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002691
2692
mbligh36768f02008-02-22 18:28:33 +00002693if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002694 main()