blob: 6e7c24b2e949650a67d23b963417007e9c0019a4 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
228 '-r', _drone_manager.absolute_path(results_dir)]
229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
233 return autoserv_argv + extra_args
234
235
showard89f84db2009-03-12 20:39:13 +0000236class SchedulerError(Exception):
237 """Raised by HostScheduler when an inconsistent state occurs."""
238
239
showard63a34772008-08-18 19:32:50 +0000240class HostScheduler(object):
241 def _get_ready_hosts(self):
242 # avoid any host with a currently active queue entry against it
243 hosts = Host.fetch(
244 joins='LEFT JOIN host_queue_entries AS active_hqe '
245 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000246 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000247 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000248 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000249 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
250 return dict((host.id, host) for host in hosts)
251
252
253 @staticmethod
254 def _get_sql_id_list(id_list):
255 return ','.join(str(item_id) for item_id in id_list)
256
257
258 @classmethod
showard989f25d2008-10-01 11:38:11 +0000259 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000260 if not id_list:
261 return {}
showard63a34772008-08-18 19:32:50 +0000262 query %= cls._get_sql_id_list(id_list)
263 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000264 return cls._process_many2many_dict(rows, flip)
265
266
267 @staticmethod
268 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000269 result = {}
270 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000271 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000272 if flip:
273 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000274 result.setdefault(left_id, set()).add(right_id)
275 return result
276
277
278 @classmethod
279 def _get_job_acl_groups(cls, job_ids):
280 query = """
showardd9ac4452009-02-07 02:04:37 +0000281 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000282 FROM jobs
283 INNER JOIN users ON users.login = jobs.owner
284 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
285 WHERE jobs.id IN (%s)
286 """
287 return cls._get_many2many_dict(query, job_ids)
288
289
290 @classmethod
291 def _get_job_ineligible_hosts(cls, job_ids):
292 query = """
293 SELECT job_id, host_id
294 FROM ineligible_host_queues
295 WHERE job_id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
showard989f25d2008-10-01 11:38:11 +0000301 def _get_job_dependencies(cls, job_ids):
302 query = """
303 SELECT job_id, label_id
304 FROM jobs_dependency_labels
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard63a34772008-08-18 19:32:50 +0000311 def _get_host_acls(cls, host_ids):
312 query = """
showardd9ac4452009-02-07 02:04:37 +0000313 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000314 FROM acl_groups_hosts
315 WHERE host_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, host_ids)
318
319
320 @classmethod
321 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000322 if not host_ids:
323 return {}, {}
showard63a34772008-08-18 19:32:50 +0000324 query = """
325 SELECT label_id, host_id
326 FROM hosts_labels
327 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000328 """ % cls._get_sql_id_list(host_ids)
329 rows = _db.execute(query)
330 labels_to_hosts = cls._process_many2many_dict(rows)
331 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
332 return labels_to_hosts, hosts_to_labels
333
334
335 @classmethod
336 def _get_labels(cls):
337 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000338
339
340 def refresh(self, pending_queue_entries):
341 self._hosts_available = self._get_ready_hosts()
342
343 relevant_jobs = [queue_entry.job_id
344 for queue_entry in pending_queue_entries]
345 self._job_acls = self._get_job_acl_groups(relevant_jobs)
346 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000347 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000348
349 host_ids = self._hosts_available.keys()
350 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000351 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
352
353 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000354
355
356 def _is_acl_accessible(self, host_id, queue_entry):
357 job_acls = self._job_acls.get(queue_entry.job_id, set())
358 host_acls = self._host_acls.get(host_id, set())
359 return len(host_acls.intersection(job_acls)) > 0
360
361
showard989f25d2008-10-01 11:38:11 +0000362 def _check_job_dependencies(self, job_dependencies, host_labels):
363 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000364 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000365
366
367 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
368 queue_entry):
showardade14e22009-01-26 22:38:32 +0000369 if not queue_entry.meta_host:
370 # bypass only_if_needed labels when a specific host is selected
371 return True
372
showard989f25d2008-10-01 11:38:11 +0000373 for label_id in host_labels:
374 label = self._labels[label_id]
375 if not label.only_if_needed:
376 # we don't care about non-only_if_needed labels
377 continue
378 if queue_entry.meta_host == label_id:
379 # if the label was requested in a metahost it's OK
380 continue
381 if label_id not in job_dependencies:
382 return False
383 return True
384
385
showard89f84db2009-03-12 20:39:13 +0000386 def _check_atomic_group_labels(self, host_labels, queue_entry):
387 """
388 Determine if the given HostQueueEntry's atomic group settings are okay
389 to schedule on a host with the given labels.
390
391 @param host_labels - A list of label ids that the host has.
392 @param queue_entry - The HostQueueEntry being considered for the host.
393
394 @returns True if atomic group settings are okay, False otherwise.
395 """
396 return (self._get_host_atomic_group_id(host_labels) ==
397 queue_entry.atomic_group_id)
398
399
400 def _get_host_atomic_group_id(self, host_labels):
401 """
402 Return the atomic group label id for a host with the given set of
403 labels if any, or None otherwise. Raises an exception if more than
404 one atomic group are found in the set of labels.
405
406 @param host_labels - A list of label ids that the host has.
407
408 @returns The id of the atomic group found on a label in host_labels
409 or None if no atomic group label is found.
410 @raises SchedulerError - If more than one atomic group label is found.
411 """
412 atomic_ids = [self._labels[label_id].atomic_group_id
413 for label_id in host_labels
414 if self._labels[label_id].atomic_group_id is not None]
415 if not atomic_ids:
416 return None
417 if len(atomic_ids) > 1:
418 raise SchedulerError('More than one atomic label on host.')
419 return atomic_ids[0]
420
421
422 def _get_atomic_group_labels(self, atomic_group_id):
423 """
424 Lookup the label ids that an atomic_group is associated with.
425
426 @param atomic_group_id - The id of the AtomicGroup to look up.
427
428 @returns A generator yeilding Label ids for this atomic group.
429 """
430 return (id for id, label in self._labels.iteritems()
431 if label.atomic_group_id == atomic_group_id
432 and not label.invalid)
433
434
435 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
436 """
437 @param group_hosts - A sequence of Host ids to test for usability
438 and eligibility against the Job associated with queue_entry.
439 @param queue_entry - The HostQueueEntry that these hosts are being
440 tested for eligibility against.
441
442 @returns A subset of group_hosts Host ids that are eligible for the
443 supplied queue_entry.
444 """
445 return set(host_id for host_id in group_hosts
446 if self._is_host_usable(host_id)
447 and self._is_host_eligible_for_job(host_id, queue_entry))
448
449
showard989f25d2008-10-01 11:38:11 +0000450 def _is_host_eligible_for_job(self, host_id, queue_entry):
451 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
452 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000453
showard89f84db2009-03-12 20:39:13 +0000454 return (self._is_acl_accessible(host_id, queue_entry) and
455 self._check_job_dependencies(job_dependencies, host_labels) and
456 self._check_only_if_needed_labels(
457 job_dependencies, host_labels, queue_entry) and
458 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000459
460
showard63a34772008-08-18 19:32:50 +0000461 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000462 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000463 return None
464 return self._hosts_available.pop(queue_entry.host_id, None)
465
466
467 def _is_host_usable(self, host_id):
468 if host_id not in self._hosts_available:
469 # host was already used during this scheduling cycle
470 return False
471 if self._hosts_available[host_id].invalid:
472 # Invalid hosts cannot be used for metahosts. They're included in
473 # the original query because they can be used by non-metahosts.
474 return False
475 return True
476
477
478 def _schedule_metahost(self, queue_entry):
479 label_id = queue_entry.meta_host
480 hosts_in_label = self._label_hosts.get(label_id, set())
481 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
482 set())
483
484 # must iterate over a copy so we can mutate the original while iterating
485 for host_id in list(hosts_in_label):
486 if not self._is_host_usable(host_id):
487 hosts_in_label.remove(host_id)
488 continue
489 if host_id in ineligible_host_ids:
490 continue
showard989f25d2008-10-01 11:38:11 +0000491 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000492 continue
493
showard89f84db2009-03-12 20:39:13 +0000494 # Remove the host from our cached internal state before returning
495 # the host object.
showard63a34772008-08-18 19:32:50 +0000496 hosts_in_label.remove(host_id)
497 return self._hosts_available.pop(host_id)
498 return None
499
500
501 def find_eligible_host(self, queue_entry):
502 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000503 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000504 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000505 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000506 return self._schedule_metahost(queue_entry)
507
508
showard89f84db2009-03-12 20:39:13 +0000509 def find_eligible_atomic_group(self, queue_entry):
510 """
511 Given an atomic group host queue entry, locate an appropriate group
512 of hosts for the associated job to run on.
513
514 The caller is responsible for creating new HQEs for the additional
515 hosts returned in order to run the actual job on them.
516
517 @returns A list of Host instances in a ready state to satisfy this
518 atomic group scheduling. Hosts will all belong to the same
519 atomic group label as specified by the queue_entry.
520 An empty list will be returned if no suitable atomic
521 group could be found.
522
523 TODO(gps): what is responsible for kicking off any attempted repairs on
524 a group of hosts? not this function, but something needs to. We do
525 not communicate that reason for returning [] outside of here...
526 For now, we'll just be unschedulable if enough hosts within one group
527 enter Repair Failed state.
528 """
529 assert queue_entry.atomic_group_id is not None
530 job = queue_entry.job
531 assert job.synch_count and job.synch_count > 0
532 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
533 if job.synch_count > atomic_group.max_number_of_machines:
534 # Such a Job and HostQueueEntry should never be possible to
535 # create using the frontend. Regardless, we can't process it.
536 # Abort it immediately and log an error on the scheduler.
537 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000538 logging.error(
539 'Error: job %d synch_count=%d > requested atomic_group %d '
540 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
541 job.id, job.synch_count, atomic_group.id,
542 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000543 return []
544 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
545 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
546 set())
547
548 # Look in each label associated with atomic_group until we find one with
549 # enough hosts to satisfy the job.
550 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
551 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
552 if queue_entry.meta_host is not None:
553 # If we have a metahost label, only allow its hosts.
554 group_hosts.intersection_update(hosts_in_label)
555 group_hosts -= ineligible_host_ids
556 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
557 group_hosts, queue_entry)
558
559 # Job.synch_count is treated as "minimum synch count" when
560 # scheduling for an atomic group of hosts. The atomic group
561 # number of machines is the maximum to pick out of a single
562 # atomic group label for scheduling at one time.
563 min_hosts = job.synch_count
564 max_hosts = atomic_group.max_number_of_machines
565
566 if len(eligible_hosts_in_group) < min_hosts:
567 # Not enough eligible hosts in this atomic group label.
568 continue
569
570 # Limit ourselves to scheduling the atomic group size.
571 if len(eligible_hosts_in_group) > max_hosts:
572 eligible_hosts_in_group = random.sample(
573 eligible_hosts_in_group, max_hosts)
574
575 # Remove the selected hosts from our cached internal state
576 # of available hosts in order to return the Host objects.
577 host_list = []
578 for host_id in eligible_hosts_in_group:
579 hosts_in_label.discard(host_id)
580 host_list.append(self._hosts_available.pop(host_id))
581 return host_list
582
583 return []
584
585
showard170873e2009-01-07 00:22:26 +0000586class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000587 def __init__(self):
588 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000589 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000590 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000591 user_cleanup_time = scheduler_config.config.clean_interval
592 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
593 _db, user_cleanup_time)
594 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000595 self._host_agents = {}
596 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000597
mbligh36768f02008-02-22 18:28:33 +0000598
showard915958d2009-04-22 21:00:58 +0000599 def initialize(self, recover_hosts=True):
600 self._periodic_cleanup.initialize()
601 self._24hr_upkeep.initialize()
602
jadmanski0afbb632008-06-06 21:10:57 +0000603 # always recover processes
604 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000605
jadmanski0afbb632008-06-06 21:10:57 +0000606 if recover_hosts:
607 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000608
609
jadmanski0afbb632008-06-06 21:10:57 +0000610 def tick(self):
showard170873e2009-01-07 00:22:26 +0000611 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000612 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000613 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000614 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000615 self._schedule_new_jobs()
616 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000617 _drone_manager.execute_actions()
618 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000619
showard97aed502008-11-04 02:01:24 +0000620
mblighf3294cc2009-04-08 21:17:38 +0000621 def _run_cleanup(self):
622 self._periodic_cleanup.run_cleanup_maybe()
623 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000624
mbligh36768f02008-02-22 18:28:33 +0000625
showard170873e2009-01-07 00:22:26 +0000626 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
627 for object_id in object_ids:
628 agent_dict.setdefault(object_id, set()).add(agent)
629
630
631 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
632 for object_id in object_ids:
633 assert object_id in agent_dict
634 agent_dict[object_id].remove(agent)
635
636
jadmanski0afbb632008-06-06 21:10:57 +0000637 def add_agent(self, agent):
638 self._agents.append(agent)
639 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000640 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
641 self._register_agent_for_ids(self._queue_entry_agents,
642 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000643
showard170873e2009-01-07 00:22:26 +0000644
645 def get_agents_for_entry(self, queue_entry):
646 """
647 Find agents corresponding to the specified queue_entry.
648 """
showardd3dc1992009-04-22 21:01:40 +0000649 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000650
651
652 def host_has_agent(self, host):
653 """
654 Determine if there is currently an Agent present using this host.
655 """
656 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def remove_agent(self, agent):
660 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000661 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
662 agent)
663 self._unregister_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000668 self._register_pidfiles()
669 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000670 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000671 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000672 self._reverify_remaining_hosts()
673 # reinitialize drones after killing orphaned processes, since they can
674 # leave around files when they die
675 _drone_manager.execute_actions()
676 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000677
showard170873e2009-01-07 00:22:26 +0000678
679 def _register_pidfiles(self):
680 # during recovery we may need to read pidfiles for both running and
681 # parsing entries
682 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000683 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000684 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000685 for pidfile_name in _ALL_PIDFILE_NAMES:
686 pidfile_id = _drone_manager.get_pidfile_id_from(
687 queue_entry.execution_tag(), pidfile_name=pidfile_name)
688 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000689
690
showardd3dc1992009-04-22 21:01:40 +0000691 def _recover_entries_with_status(self, status, orphans, pidfile_name,
692 recover_entries_fn):
693 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000694 for queue_entry in queue_entries:
695 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000696 # synchronous job we've already recovered
697 continue
showardd3dc1992009-04-22 21:01:40 +0000698 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000699 execution_tag = queue_entry.execution_tag()
700 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000701 run_monitor.attach_to_existing_process(execution_tag,
702 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000703 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000704 # execution apparently never happened
705 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000706 continue
mbligh90a549d2008-03-25 23:52:34 +0000707
showardd3dc1992009-04-22 21:01:40 +0000708 logging.info('Recovering %s entry %s (process %s)',
709 status.lower(),
710 ', '.join(str(entry) for entry in queue_entries),
711 run_monitor.get_process())
712 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
713 orphans.discard(run_monitor.get_process())
714
715
716 def _kill_remaining_orphan_processes(self, orphans):
717 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000718 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000719 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000720
showard170873e2009-01-07 00:22:26 +0000721
showardd3dc1992009-04-22 21:01:40 +0000722 def _recover_running_entries(self, orphans):
723 def recover_entries(job, queue_entries, run_monitor):
724 if run_monitor is not None:
725 queue_task = RecoveryQueueTask(job=job,
726 queue_entries=queue_entries,
727 run_monitor=run_monitor)
728 self.add_agent(Agent(tasks=[queue_task],
729 num_processes=len(queue_entries)))
730 # else, _requeue_other_active_entries will cover this
731
732 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
733 orphans, '.autoserv_execute',
734 recover_entries)
735
736
737 def _recover_gathering_entries(self, orphans):
738 def recover_entries(job, queue_entries, run_monitor):
739 gather_task = GatherLogsTask(job, queue_entries,
740 run_monitor=run_monitor)
741 self.add_agent(Agent([gather_task]))
742
743 self._recover_entries_with_status(
744 models.HostQueueEntry.Status.GATHERING,
745 orphans, _CRASHINFO_PID_FILE, recover_entries)
746
747
748 def _recover_parsing_entries(self, orphans):
749 def recover_entries(job, queue_entries, run_monitor):
750 reparse_task = FinalReparseTask(queue_entries,
751 run_monitor=run_monitor)
752 self.add_agent(Agent([reparse_task], num_processes=0))
753
754 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
755 orphans, _PARSER_PID_FILE,
756 recover_entries)
757
758
759 def _recover_all_recoverable_entries(self):
760 orphans = _drone_manager.get_orphaned_autoserv_processes()
761 self._recover_running_entries(orphans)
762 self._recover_gathering_entries(orphans)
763 self._recover_parsing_entries(orphans)
764 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000765
showard97aed502008-11-04 02:01:24 +0000766
showard170873e2009-01-07 00:22:26 +0000767 def _requeue_other_active_entries(self):
768 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000769 where='active AND NOT complete AND '
770 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000771 for queue_entry in queue_entries:
772 if self.get_agents_for_entry(queue_entry):
773 # entry has already been recovered
774 continue
showardd3dc1992009-04-22 21:01:40 +0000775 if queue_entry.aborted:
776 queue_entry.abort(self)
777 continue
778
779 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000780 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000781 if queue_entry.host:
782 tasks = queue_entry.host.reverify_tasks()
783 self.add_agent(Agent(tasks))
784 agent = queue_entry.requeue()
785
786
787 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000788 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000789 self._reverify_hosts_where("""(status = 'Repairing' OR
790 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000791 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000792
showard170873e2009-01-07 00:22:26 +0000793 # recover "Running" hosts with no active queue entries, although this
794 # should never happen
795 message = ('Recovering running host %s - this probably indicates a '
796 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000797 self._reverify_hosts_where("""status = 'Running' AND
798 id NOT IN (SELECT host_id
799 FROM host_queue_entries
800 WHERE active)""",
801 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000802
803
jadmanski0afbb632008-06-06 21:10:57 +0000804 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000805 print_message='Reverifying host %s'):
806 full_where='locked = 0 AND invalid = 0 AND ' + where
807 for host in Host.fetch(where=full_where):
808 if self.host_has_agent(host):
809 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000810 continue
showard170873e2009-01-07 00:22:26 +0000811 if print_message:
showardb18134f2009-03-20 20:52:18 +0000812 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000813 tasks = host.reverify_tasks()
814 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000815
816
jadmanski0afbb632008-06-06 21:10:57 +0000817 def _recover_hosts(self):
818 # recover "Repair Failed" hosts
819 message = 'Reverifying dead host %s'
820 self._reverify_hosts_where("status = 'Repair Failed'",
821 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000822
823
showard04c82c52008-05-29 19:38:12 +0000824
showardb95b1bd2008-08-15 18:11:04 +0000825 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000826 # prioritize by job priority, then non-metahost over metahost, then FIFO
827 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000828 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000829 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000830 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000831
832
showard89f84db2009-03-12 20:39:13 +0000833 def _refresh_pending_queue_entries(self):
834 """
835 Lookup the pending HostQueueEntries and call our HostScheduler
836 refresh() method given that list. Return the list.
837
838 @returns A list of pending HostQueueEntries sorted in priority order.
839 """
showard63a34772008-08-18 19:32:50 +0000840 queue_entries = self._get_pending_queue_entries()
841 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000842 return []
showardb95b1bd2008-08-15 18:11:04 +0000843
showard63a34772008-08-18 19:32:50 +0000844 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000845
showard89f84db2009-03-12 20:39:13 +0000846 return queue_entries
847
848
849 def _schedule_atomic_group(self, queue_entry):
850 """
851 Schedule the given queue_entry on an atomic group of hosts.
852
853 Returns immediately if there are insufficient available hosts.
854
855 Creates new HostQueueEntries based off of queue_entry for the
856 scheduled hosts and starts them all running.
857 """
858 # This is a virtual host queue entry representing an entire
859 # atomic group, find a group and schedule their hosts.
860 group_hosts = self._host_scheduler.find_eligible_atomic_group(
861 queue_entry)
862 if not group_hosts:
863 return
864 # The first assigned host uses the original HostQueueEntry
865 group_queue_entries = [queue_entry]
866 for assigned_host in group_hosts[1:]:
867 # Create a new HQE for every additional assigned_host.
868 new_hqe = HostQueueEntry.clone(queue_entry)
869 new_hqe.save()
870 group_queue_entries.append(new_hqe)
871 assert len(group_queue_entries) == len(group_hosts)
872 for queue_entry, host in itertools.izip(group_queue_entries,
873 group_hosts):
874 self._run_queue_entry(queue_entry, host)
875
876
877 def _schedule_new_jobs(self):
878 queue_entries = self._refresh_pending_queue_entries()
879 if not queue_entries:
880 return
881
showard63a34772008-08-18 19:32:50 +0000882 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000883 if (queue_entry.atomic_group_id is None or
884 queue_entry.host_id is not None):
885 assigned_host = self._host_scheduler.find_eligible_host(
886 queue_entry)
887 if assigned_host:
888 self._run_queue_entry(queue_entry, assigned_host)
889 else:
890 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000891
892
893 def _run_queue_entry(self, queue_entry, host):
894 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000895 # in some cases (synchronous jobs with run_verify=False), agent may be
896 # None
showard9976ce92008-10-15 20:28:13 +0000897 if agent:
898 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000899
900
jadmanski0afbb632008-06-06 21:10:57 +0000901 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000902 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
903 for agent in self.get_agents_for_entry(entry):
904 agent.abort()
905 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000906
907
showard324bf812009-01-20 23:23:38 +0000908 def _can_start_agent(self, agent, num_started_this_cycle,
909 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000910 # always allow zero-process agents to run
911 if agent.num_processes == 0:
912 return True
913 # don't allow any nonzero-process agents to run after we've reached a
914 # limit (this avoids starvation of many-process agents)
915 if have_reached_limit:
916 return False
917 # total process throttling
showard324bf812009-01-20 23:23:38 +0000918 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000919 return False
920 # if a single agent exceeds the per-cycle throttling, still allow it to
921 # run when it's the first agent in the cycle
922 if num_started_this_cycle == 0:
923 return True
924 # per-cycle throttling
925 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000926 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000927 return False
928 return True
929
930
jadmanski0afbb632008-06-06 21:10:57 +0000931 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000932 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000933 have_reached_limit = False
934 # iterate over copy, so we can remove agents during iteration
935 for agent in list(self._agents):
936 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000937 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000938 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000939 continue
940 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000941 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000942 have_reached_limit):
943 have_reached_limit = True
944 continue
showard4c5374f2008-09-04 17:02:56 +0000945 num_started_this_cycle += agent.num_processes
946 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000947 logging.info('%d running processes',
948 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000949
950
showard29f7cd22009-04-29 21:16:24 +0000951 def _process_recurring_runs(self):
952 recurring_runs = models.RecurringRun.objects.filter(
953 start_date__lte=datetime.datetime.now())
954 for rrun in recurring_runs:
955 # Create job from template
956 job = rrun.job
957 info = rpc_utils.get_job_info(job)
958
959 host_objects = info['hosts']
960 one_time_hosts = info['one_time_hosts']
961 metahost_objects = info['meta_hosts']
962 dependencies = info['dependencies']
963 atomic_group = info['atomic_group']
964
965 for host in one_time_hosts or []:
966 this_host = models.Host.create_one_time_host(host.hostname)
967 host_objects.append(this_host)
968
969 try:
970 rpc_utils.create_new_job(owner=rrun.owner.login,
971 host_objects=host_objects,
972 metahost_objects=metahost_objects,
973 name=job.name,
974 priority=job.priority,
975 control_file=job.control_file,
976 control_type=job.control_type,
977 is_template=False,
978 synch_count=job.synch_count,
979 timeout=job.timeout,
980 run_verify=job.run_verify,
981 email_list=job.email_list,
982 dependencies=dependencies,
983 reboot_before=job.reboot_before,
984 reboot_after=job.reboot_after,
985 atomic_group=atomic_group)
986
987 except Exception, ex:
988 logging.exception(ex)
989 #TODO send email
990
991 if rrun.loop_count == 1:
992 rrun.delete()
993 else:
994 if rrun.loop_count != 0: # if not infinite loop
995 # calculate new start_date
996 difference = datetime.timedelta(seconds=rrun.loop_period)
997 rrun.start_date = rrun.start_date + difference
998 rrun.loop_count -= 1
999 rrun.save()
1000
1001
showard170873e2009-01-07 00:22:26 +00001002class PidfileRunMonitor(object):
1003 """
1004 Client must call either run() to start a new process or
1005 attach_to_existing_process().
1006 """
mbligh36768f02008-02-22 18:28:33 +00001007
showard170873e2009-01-07 00:22:26 +00001008 class _PidfileException(Exception):
1009 """
1010 Raised when there's some unexpected behavior with the pid file, but only
1011 used internally (never allowed to escape this class).
1012 """
mbligh36768f02008-02-22 18:28:33 +00001013
1014
showard170873e2009-01-07 00:22:26 +00001015 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001016 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001017 self._start_time = None
1018 self.pidfile_id = None
1019 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001020
1021
showard170873e2009-01-07 00:22:26 +00001022 def _add_nice_command(self, command, nice_level):
1023 if not nice_level:
1024 return command
1025 return ['nice', '-n', str(nice_level)] + command
1026
1027
1028 def _set_start_time(self):
1029 self._start_time = time.time()
1030
1031
1032 def run(self, command, working_directory, nice_level=None, log_file=None,
1033 pidfile_name=None, paired_with_pidfile=None):
1034 assert command is not None
1035 if nice_level is not None:
1036 command = ['nice', '-n', str(nice_level)] + command
1037 self._set_start_time()
1038 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001039 command, working_directory, pidfile_name=pidfile_name,
1040 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001041
1042
showardd3dc1992009-04-22 21:01:40 +00001043 def attach_to_existing_process(self, execution_tag,
1044 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001045 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001046 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1047 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001048 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001049
1050
jadmanski0afbb632008-06-06 21:10:57 +00001051 def kill(self):
showard170873e2009-01-07 00:22:26 +00001052 if self.has_process():
1053 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001054
mbligh36768f02008-02-22 18:28:33 +00001055
showard170873e2009-01-07 00:22:26 +00001056 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001057 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001058 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001059
1060
showard170873e2009-01-07 00:22:26 +00001061 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001062 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001063 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001064 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001065
1066
showard170873e2009-01-07 00:22:26 +00001067 def _read_pidfile(self, use_second_read=False):
1068 assert self.pidfile_id is not None, (
1069 'You must call run() or attach_to_existing_process()')
1070 contents = _drone_manager.get_pidfile_contents(
1071 self.pidfile_id, use_second_read=use_second_read)
1072 if contents.is_invalid():
1073 self._state = drone_manager.PidfileContents()
1074 raise self._PidfileException(contents)
1075 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001076
1077
showard21baa452008-10-21 00:08:39 +00001078 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001079 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1080 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001081 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001082 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001083 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001084
1085
1086 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001087 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001088 return
mblighbb421852008-03-11 22:36:16 +00001089
showard21baa452008-10-21 00:08:39 +00001090 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001091
showard170873e2009-01-07 00:22:26 +00001092 if self._state.process is None:
1093 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001094 return
mbligh90a549d2008-03-25 23:52:34 +00001095
showard21baa452008-10-21 00:08:39 +00001096 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001097 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001098 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001099 return
mbligh90a549d2008-03-25 23:52:34 +00001100
showard170873e2009-01-07 00:22:26 +00001101 # pid but no running process - maybe process *just* exited
1102 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001103 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001104 # autoserv exited without writing an exit code
1105 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001106 self._handle_pidfile_error(
1107 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001108
showard21baa452008-10-21 00:08:39 +00001109
1110 def _get_pidfile_info(self):
1111 """\
1112 After completion, self._state will contain:
1113 pid=None, exit_status=None if autoserv has not yet run
1114 pid!=None, exit_status=None if autoserv is running
1115 pid!=None, exit_status!=None if autoserv has completed
1116 """
1117 try:
1118 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001119 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001120 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001121
1122
showard170873e2009-01-07 00:22:26 +00001123 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001124 """\
1125 Called when no pidfile is found or no pid is in the pidfile.
1126 """
showard170873e2009-01-07 00:22:26 +00001127 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001128 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001129 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1130 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001131 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001132 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001133
1134
showard35162b02009-03-03 02:17:30 +00001135 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001136 """\
1137 Called when autoserv has exited without writing an exit status,
1138 or we've timed out waiting for autoserv to write a pid to the
1139 pidfile. In either case, we just return failure and the caller
1140 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001141
showard170873e2009-01-07 00:22:26 +00001142 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001143 """
1144 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001145 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001146 self._state.exit_status = 1
1147 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001151 self._get_pidfile_info()
1152 return self._state.exit_status
1153
1154
1155 def num_tests_failed(self):
1156 self._get_pidfile_info()
1157 assert self._state.num_tests_failed is not None
1158 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001159
1160
mbligh36768f02008-02-22 18:28:33 +00001161class Agent(object):
showard170873e2009-01-07 00:22:26 +00001162 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001163 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001164 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001165 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001166 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001167
showard170873e2009-01-07 00:22:26 +00001168 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1169 for task in tasks)
1170 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1171
showardd3dc1992009-04-22 21:01:40 +00001172 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001173 for task in tasks:
1174 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001175
1176
showardd3dc1992009-04-22 21:01:40 +00001177 def _clear_queue(self):
1178 self.queue = Queue.Queue(0)
1179
1180
showard170873e2009-01-07 00:22:26 +00001181 def _union_ids(self, id_lists):
1182 return set(itertools.chain(*id_lists))
1183
1184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def add_task(self, task):
1186 self.queue.put_nowait(task)
1187 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001188
1189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def tick(self):
showard21baa452008-10-21 00:08:39 +00001191 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001192 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001193 self.active_task.poll()
1194 if not self.active_task.is_done():
1195 return
1196 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001197
1198
jadmanski0afbb632008-06-06 21:10:57 +00001199 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001200 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001201 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001202 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001203 if not self.active_task.success:
1204 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001205 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001206
jadmanski0afbb632008-06-06 21:10:57 +00001207 if not self.is_done():
1208 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001212 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001213 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1214 # get reset.
1215 new_agent = Agent(self.active_task.failure_tasks)
1216 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001217
mblighe2586682008-02-29 22:45:46 +00001218
showard4c5374f2008-09-04 17:02:56 +00001219 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001220 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001221
1222
jadmanski0afbb632008-06-06 21:10:57 +00001223 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001224 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001225
1226
showardd3dc1992009-04-22 21:01:40 +00001227 def abort(self):
showard08a36412009-05-05 01:01:13 +00001228 # abort tasks until the queue is empty or a task ignores the abort
1229 while not self.is_done():
1230 if not self.active_task:
1231 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001232 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001233 if not self.active_task.aborted:
1234 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001235 return
1236 self.active_task = None
1237
showardd3dc1992009-04-22 21:01:40 +00001238
mbligh36768f02008-02-22 18:28:33 +00001239class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001240 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1241 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001242 self.done = False
1243 self.failure_tasks = failure_tasks
1244 self.started = False
1245 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001246 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001247 self.task = None
1248 self.agent = None
1249 self.monitor = None
1250 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001251 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001252 self.queue_entry_ids = []
1253 self.host_ids = []
1254 self.log_file = None
1255
1256
1257 def _set_ids(self, host=None, queue_entries=None):
1258 if queue_entries and queue_entries != [None]:
1259 self.host_ids = [entry.host.id for entry in queue_entries]
1260 self.queue_entry_ids = [entry.id for entry in queue_entries]
1261 else:
1262 assert host
1263 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def poll(self):
showard08a36412009-05-05 01:01:13 +00001267 if not self.started:
1268 self.start()
1269 self.tick()
1270
1271
1272 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001273 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001274 exit_code = self.monitor.exit_code()
1275 if exit_code is None:
1276 return
1277 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001278 else:
1279 success = False
mbligh36768f02008-02-22 18:28:33 +00001280
jadmanski0afbb632008-06-06 21:10:57 +00001281 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001282
1283
jadmanski0afbb632008-06-06 21:10:57 +00001284 def is_done(self):
1285 return self.done
mbligh36768f02008-02-22 18:28:33 +00001286
1287
jadmanski0afbb632008-06-06 21:10:57 +00001288 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001289 if self.done:
1290 return
jadmanski0afbb632008-06-06 21:10:57 +00001291 self.done = True
1292 self.success = success
1293 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001294
1295
jadmanski0afbb632008-06-06 21:10:57 +00001296 def prolog(self):
1297 pass
mblighd64e5702008-04-04 21:39:28 +00001298
1299
jadmanski0afbb632008-06-06 21:10:57 +00001300 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001301 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001302
mbligh36768f02008-02-22 18:28:33 +00001303
jadmanski0afbb632008-06-06 21:10:57 +00001304 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001305 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001306 _drone_manager.copy_to_results_repository(
1307 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001308
1309
jadmanski0afbb632008-06-06 21:10:57 +00001310 def epilog(self):
1311 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001312
1313
jadmanski0afbb632008-06-06 21:10:57 +00001314 def start(self):
1315 assert self.agent
1316
1317 if not self.started:
1318 self.prolog()
1319 self.run()
1320
1321 self.started = True
1322
1323
1324 def abort(self):
1325 if self.monitor:
1326 self.monitor.kill()
1327 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001328 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001329 self.cleanup()
1330
1331
showard170873e2009-01-07 00:22:26 +00001332 def set_host_log_file(self, base_name, host):
1333 filename = '%s.%s' % (time.time(), base_name)
1334 self.log_file = os.path.join('hosts', host.hostname, filename)
1335
1336
showardde634ee2009-01-30 01:44:24 +00001337 def _get_consistent_execution_tag(self, queue_entries):
1338 first_execution_tag = queue_entries[0].execution_tag()
1339 for queue_entry in queue_entries[1:]:
1340 assert queue_entry.execution_tag() == first_execution_tag, (
1341 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1342 queue_entry,
1343 first_execution_tag,
1344 queue_entries[0]))
1345 return first_execution_tag
1346
1347
showard6b733412009-04-27 20:09:18 +00001348 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001349 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001350 if use_monitor is None:
1351 assert self.monitor
1352 use_monitor = self.monitor
1353 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001354 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001355 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001356 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001357 results_path)
showardde634ee2009-01-30 01:44:24 +00001358
1359 reparse_task = FinalReparseTask(queue_entries)
1360 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1361
1362
showardd3dc1992009-04-22 21:01:40 +00001363 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001364 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001365 self.monitor = PidfileRunMonitor()
1366 self.monitor.run(self.cmd, self._working_directory,
1367 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001368 log_file=self.log_file,
1369 pidfile_name=pidfile_name,
1370 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001371
1372
showardd9205182009-04-27 20:09:55 +00001373class TaskWithJobKeyvals(object):
1374 """AgentTask mixin providing functionality to help with job keyval files."""
1375 _KEYVAL_FILE = 'keyval'
1376 def _format_keyval(self, key, value):
1377 return '%s=%s' % (key, value)
1378
1379
1380 def _keyval_path(self):
1381 """Subclasses must override this"""
1382 raise NotImplemented
1383
1384
1385 def _write_keyval_after_job(self, field, value):
1386 assert self.monitor
1387 if not self.monitor.has_process():
1388 return
1389 _drone_manager.write_lines_to_file(
1390 self._keyval_path(), [self._format_keyval(field, value)],
1391 paired_with_process=self.monitor.get_process())
1392
1393
1394 def _job_queued_keyval(self, job):
1395 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1396
1397
1398 def _write_job_finished(self):
1399 self._write_keyval_after_job("job_finished", int(time.time()))
1400
1401
1402class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001403 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001404 """\
showard170873e2009-01-07 00:22:26 +00001405 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001406 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001407 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001408 # normalize the protection name
1409 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001410
jadmanski0afbb632008-06-06 21:10:57 +00001411 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001412 self.queue_entry_to_fail = queue_entry
1413 # *don't* include the queue entry in IDs -- if the queue entry is
1414 # aborted, we want to leave the repair task running
1415 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001416
1417 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001418 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1419 ['-R', '--host-protection', protection],
1420 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001421 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1422
showard170873e2009-01-07 00:22:26 +00001423 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001424
mbligh36768f02008-02-22 18:28:33 +00001425
jadmanski0afbb632008-06-06 21:10:57 +00001426 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001427 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001428 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001429 if self.queue_entry_to_fail:
1430 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001431
1432
showardd9205182009-04-27 20:09:55 +00001433 def _keyval_path(self):
1434 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1435
1436
showardde634ee2009-01-30 01:44:24 +00001437 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001438 assert self.queue_entry_to_fail
1439
1440 if self.queue_entry_to_fail.meta_host:
1441 return # don't fail metahost entries, they'll be reassigned
1442
1443 self.queue_entry_to_fail.update_from_database()
1444 if self.queue_entry_to_fail.status != 'Queued':
1445 return # entry has been aborted
1446
1447 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001448 queued_key, queued_time = self._job_queued_keyval(
1449 self.queue_entry_to_fail.job)
1450 self._write_keyval_after_job(queued_key, queued_time)
1451 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001452 # copy results logs into the normal place for job results
1453 _drone_manager.copy_results_on_drone(
1454 self.monitor.get_process(),
1455 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001456 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001457
showardccbd6c52009-03-21 00:10:21 +00001458 self._copy_and_parse_results([self.queue_entry_to_fail])
1459 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001460
1461
jadmanski0afbb632008-06-06 21:10:57 +00001462 def epilog(self):
1463 super(RepairTask, self).epilog()
1464 if self.success:
1465 self.host.set_status('Ready')
1466 else:
1467 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001468 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001469 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001470
1471
showard8fe93b52008-11-18 17:53:22 +00001472class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001473 def epilog(self):
1474 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001475 should_copy_results = (self.queue_entry and not self.success
1476 and not self.queue_entry.meta_host)
1477 if should_copy_results:
1478 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001479 destination = os.path.join(self.queue_entry.execution_tag(),
1480 os.path.basename(self.log_file))
1481 _drone_manager.copy_to_results_repository(
1482 self.monitor.get_process(), self.log_file,
1483 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001484
1485
1486class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001487 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001488 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001489 self.host = host or queue_entry.host
1490 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001491
jadmanski0afbb632008-06-06 21:10:57 +00001492 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001493 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1494 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001495 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001496 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1497 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001498
showard170873e2009-01-07 00:22:26 +00001499 self.set_host_log_file('verify', self.host)
1500 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001501
1502
jadmanski0afbb632008-06-06 21:10:57 +00001503 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001504 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001505 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001506 if self.queue_entry:
1507 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001508 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001509
1510
jadmanski0afbb632008-06-06 21:10:57 +00001511 def epilog(self):
1512 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001513
jadmanski0afbb632008-06-06 21:10:57 +00001514 if self.success:
1515 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001516
1517
showardd9205182009-04-27 20:09:55 +00001518class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001519 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001520 self.job = job
1521 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001522 super(QueueTask, self).__init__(cmd, self._execution_tag())
1523 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001524
1525
showard73ec0442009-02-07 02:05:20 +00001526 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001527 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001528
1529
1530 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1531 keyval_contents = '\n'.join(self._format_keyval(key, value)
1532 for key, value in keyval_dict.iteritems())
1533 # always end with a newline to allow additional keyvals to be written
1534 keyval_contents += '\n'
1535 _drone_manager.attach_file_to_execution(self._execution_tag(),
1536 keyval_contents,
1537 file_path=keyval_path)
1538
1539
1540 def _write_keyvals_before_job(self, keyval_dict):
1541 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1542
1543
showard170873e2009-01-07 00:22:26 +00001544 def _write_host_keyvals(self, host):
1545 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1546 host.hostname)
1547 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001548 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1549 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001550
1551
showard170873e2009-01-07 00:22:26 +00001552 def _execution_tag(self):
1553 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001554
1555
jadmanski0afbb632008-06-06 21:10:57 +00001556 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001557 queued_key, queued_time = self._job_queued_keyval(self.job)
1558 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001559 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001560 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001561 queue_entry.set_status('Running')
1562 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001563 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001564 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001565 assert len(self.queue_entries) == 1
1566 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001567
1568
showard35162b02009-03-03 02:17:30 +00001569 def _write_lost_process_error_file(self):
1570 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1571 _drone_manager.write_lines_to_file(error_file_path,
1572 [_LOST_PROCESS_ERROR])
1573
1574
showardd3dc1992009-04-22 21:01:40 +00001575 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001576 if not self.monitor:
1577 return
1578
showardd9205182009-04-27 20:09:55 +00001579 self._write_job_finished()
1580
showardd3dc1992009-04-22 21:01:40 +00001581 # both of these conditionals can be true, iff the process ran, wrote a
1582 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001583 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001584 gather_task = GatherLogsTask(self.job, self.queue_entries)
1585 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001586
1587 if self.monitor.lost_process:
1588 self._write_lost_process_error_file()
1589 for queue_entry in self.queue_entries:
1590 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001591
1592
showardcbd74612008-11-19 21:42:02 +00001593 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001594 _drone_manager.write_lines_to_file(
1595 os.path.join(self._execution_tag(), 'status.log'),
1596 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001597 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001598
1599
jadmanskif7fa2cc2008-10-01 14:13:23 +00001600 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001601 if not self.monitor or not self.monitor.has_process():
1602 return
1603
jadmanskif7fa2cc2008-10-01 14:13:23 +00001604 # build up sets of all the aborted_by and aborted_on values
1605 aborted_by, aborted_on = set(), set()
1606 for queue_entry in self.queue_entries:
1607 if queue_entry.aborted_by:
1608 aborted_by.add(queue_entry.aborted_by)
1609 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1610 aborted_on.add(t)
1611
1612 # extract some actual, unique aborted by value and write it out
1613 assert len(aborted_by) <= 1
1614 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001615 aborted_by_value = aborted_by.pop()
1616 aborted_on_value = max(aborted_on)
1617 else:
1618 aborted_by_value = 'autotest_system'
1619 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001620
showarda0382352009-02-11 23:36:43 +00001621 self._write_keyval_after_job("aborted_by", aborted_by_value)
1622 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001623
showardcbd74612008-11-19 21:42:02 +00001624 aborted_on_string = str(datetime.datetime.fromtimestamp(
1625 aborted_on_value))
1626 self._write_status_comment('Job aborted by %s on %s' %
1627 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001628
1629
jadmanski0afbb632008-06-06 21:10:57 +00001630 def abort(self):
1631 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001632 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001633 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001634
1635
jadmanski0afbb632008-06-06 21:10:57 +00001636 def epilog(self):
1637 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001638 self._finish_task()
1639 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001640
1641
mblighbb421852008-03-11 22:36:16 +00001642class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001643 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001644 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001645 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001646
1647
jadmanski0afbb632008-06-06 21:10:57 +00001648 def run(self):
1649 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001650
1651
jadmanski0afbb632008-06-06 21:10:57 +00001652 def prolog(self):
1653 # recovering an existing process - don't do prolog
1654 pass
mblighbb421852008-03-11 22:36:16 +00001655
1656
showardd3dc1992009-04-22 21:01:40 +00001657class PostJobTask(AgentTask):
1658 def __init__(self, queue_entries, pidfile_name, logfile_name,
1659 run_monitor=None):
1660 """
1661 If run_monitor != None, we're recovering a running task.
1662 """
1663 self._queue_entries = queue_entries
1664 self._pidfile_name = pidfile_name
1665 self._run_monitor = run_monitor
1666
1667 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1668 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1669 self._autoserv_monitor = PidfileRunMonitor()
1670 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1671 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1672
1673 if _testing_mode:
1674 command = 'true'
1675 else:
1676 command = self._generate_command(self._results_dir)
1677
1678 super(PostJobTask, self).__init__(cmd=command,
1679 working_directory=self._execution_tag)
1680
1681 self.log_file = os.path.join(self._execution_tag, logfile_name)
1682 self._final_status = self._determine_final_status()
1683
1684
1685 def _generate_command(self, results_dir):
1686 raise NotImplementedError('Subclasses must override this')
1687
1688
1689 def _job_was_aborted(self):
1690 was_aborted = None
1691 for queue_entry in self._queue_entries:
1692 queue_entry.update_from_database()
1693 if was_aborted is None: # first queue entry
1694 was_aborted = bool(queue_entry.aborted)
1695 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1696 email_manager.manager.enqueue_notify_email(
1697 'Inconsistent abort state',
1698 'Queue entries have inconsistent abort state: ' +
1699 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1700 # don't crash here, just assume true
1701 return True
1702 return was_aborted
1703
1704
1705 def _determine_final_status(self):
1706 if self._job_was_aborted():
1707 return models.HostQueueEntry.Status.ABORTED
1708
1709 # we'll use a PidfileRunMonitor to read the autoserv exit status
1710 if self._autoserv_monitor.exit_code() == 0:
1711 return models.HostQueueEntry.Status.COMPLETED
1712 return models.HostQueueEntry.Status.FAILED
1713
1714
1715 def run(self):
1716 if self._run_monitor is not None:
1717 self.monitor = self._run_monitor
1718 else:
1719 # make sure we actually have results to work with.
1720 # this should never happen in normal operation.
1721 if not self._autoserv_monitor.has_process():
1722 email_manager.manager.enqueue_notify_email(
1723 'No results in post-job task',
1724 'No results in post-job task at %s' %
1725 self._autoserv_monitor.pidfile_id)
1726 self.finished(False)
1727 return
1728
1729 super(PostJobTask, self).run(
1730 pidfile_name=self._pidfile_name,
1731 paired_with_pidfile=self._paired_with_pidfile)
1732
1733
1734 def _set_all_statuses(self, status):
1735 for queue_entry in self._queue_entries:
1736 queue_entry.set_status(status)
1737
1738
1739 def abort(self):
1740 # override AgentTask.abort() to avoid killing the process and ending
1741 # the task. post-job tasks continue when the job is aborted.
1742 pass
1743
1744
1745class GatherLogsTask(PostJobTask):
1746 """
1747 Task responsible for
1748 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1749 * copying logs to the results repository
1750 * spawning CleanupTasks for hosts, if necessary
1751 * spawning a FinalReparseTask for the job
1752 """
1753 def __init__(self, job, queue_entries, run_monitor=None):
1754 self._job = job
1755 super(GatherLogsTask, self).__init__(
1756 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1757 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1758 self._set_ids(queue_entries=queue_entries)
1759
1760
1761 def _generate_command(self, results_dir):
1762 host_list = ','.join(queue_entry.host.hostname
1763 for queue_entry in self._queue_entries)
1764 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1765 '-r', results_dir]
1766
1767
1768 def prolog(self):
1769 super(GatherLogsTask, self).prolog()
1770 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1771
1772
1773 def _reboot_hosts(self):
1774 reboot_after = self._job.reboot_after
1775 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001776 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1777 do_reboot = True
1778 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001779 do_reboot = True
1780 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1781 final_success = (
1782 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1783 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1784 do_reboot = (final_success and num_tests_failed == 0)
1785
1786 for queue_entry in self._queue_entries:
1787 if do_reboot:
1788 # don't pass the queue entry to the CleanupTask. if the cleanup
1789 # fails, the job doesn't care -- it's over.
1790 cleanup_task = CleanupTask(host=queue_entry.host)
1791 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1792 else:
1793 queue_entry.host.set_status('Ready')
1794
1795
1796 def epilog(self):
1797 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001798 self._copy_and_parse_results(self._queue_entries,
1799 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001800 self._reboot_hosts()
1801
1802
showard0bbfc212009-04-29 21:06:13 +00001803 def run(self):
1804 if self._final_status == models.HostQueueEntry.Status.COMPLETED:
1805 # don't run at all if Autoserv exited successfully
1806 self.finished(True)
1807 else:
1808 super(GatherLogsTask, self).run()
1809
1810
showard8fe93b52008-11-18 17:53:22 +00001811class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001812 def __init__(self, host=None, queue_entry=None):
1813 assert bool(host) ^ bool(queue_entry)
1814 if queue_entry:
1815 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001816 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001817 self.host = host
showard170873e2009-01-07 00:22:26 +00001818
1819 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001820 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1821 ['--cleanup'],
1822 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001823 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001824 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1825 failure_tasks=[repair_task])
1826
1827 self._set_ids(host=host, queue_entries=[queue_entry])
1828 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001829
mblighd5c95802008-03-05 00:33:46 +00001830
jadmanski0afbb632008-06-06 21:10:57 +00001831 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001832 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001833 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001834 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001835
mblighd5c95802008-03-05 00:33:46 +00001836
showard21baa452008-10-21 00:08:39 +00001837 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001838 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001839 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001840 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001841 self.host.update_field('dirty', 0)
1842
1843
showardd3dc1992009-04-22 21:01:40 +00001844class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001845 _num_running_parses = 0
1846
showardd3dc1992009-04-22 21:01:40 +00001847 def __init__(self, queue_entries, run_monitor=None):
1848 super(FinalReparseTask, self).__init__(queue_entries,
1849 pidfile_name=_PARSER_PID_FILE,
1850 logfile_name='.parse.log',
1851 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001852 # don't use _set_ids, since we don't want to set the host_ids
1853 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001854 self._parse_started = False
1855
showard97aed502008-11-04 02:01:24 +00001856
1857 @classmethod
1858 def _increment_running_parses(cls):
1859 cls._num_running_parses += 1
1860
1861
1862 @classmethod
1863 def _decrement_running_parses(cls):
1864 cls._num_running_parses -= 1
1865
1866
1867 @classmethod
1868 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001869 return (cls._num_running_parses <
1870 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001871
1872
1873 def prolog(self):
1874 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001875 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001876
1877
1878 def epilog(self):
1879 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001880 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001881
1882
showardd3dc1992009-04-22 21:01:40 +00001883 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001884 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001885 results_dir]
showard97aed502008-11-04 02:01:24 +00001886
1887
showard08a36412009-05-05 01:01:13 +00001888 def tick(self):
1889 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001890 # and we can, at which point we revert to default behavior
1891 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001892 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001893 else:
1894 self._try_starting_parse()
1895
1896
1897 def run(self):
1898 # override run() to not actually run unless we can
1899 self._try_starting_parse()
1900
1901
1902 def _try_starting_parse(self):
1903 if not self._can_run_new_parse():
1904 return
showard170873e2009-01-07 00:22:26 +00001905
showard97aed502008-11-04 02:01:24 +00001906 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001907 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001908
showard97aed502008-11-04 02:01:24 +00001909 self._increment_running_parses()
1910 self._parse_started = True
1911
1912
1913 def finished(self, success):
1914 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001915 if self._parse_started:
1916 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001917
1918
showardc9ae1782009-01-30 01:42:37 +00001919class SetEntryPendingTask(AgentTask):
1920 def __init__(self, queue_entry):
1921 super(SetEntryPendingTask, self).__init__(cmd='')
1922 self._queue_entry = queue_entry
1923 self._set_ids(queue_entries=[queue_entry])
1924
1925
1926 def run(self):
1927 agent = self._queue_entry.on_pending()
1928 if agent:
1929 self.agent.dispatcher.add_agent(agent)
1930 self.finished(True)
1931
1932
showarda3c58572009-03-12 20:36:59 +00001933class DBError(Exception):
1934 """Raised by the DBObject constructor when its select fails."""
1935
1936
mbligh36768f02008-02-22 18:28:33 +00001937class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001938 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001939
1940 # Subclasses MUST override these:
1941 _table_name = ''
1942 _fields = ()
1943
showarda3c58572009-03-12 20:36:59 +00001944 # A mapping from (type, id) to the instance of the object for that
1945 # particular id. This prevents us from creating new Job() and Host()
1946 # instances for every HostQueueEntry object that we instantiate as
1947 # multiple HQEs often share the same Job.
1948 _instances_by_type_and_id = weakref.WeakValueDictionary()
1949 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001950
showarda3c58572009-03-12 20:36:59 +00001951
1952 def __new__(cls, id=None, **kwargs):
1953 """
1954 Look to see if we already have an instance for this particular type
1955 and id. If so, use it instead of creating a duplicate instance.
1956 """
1957 if id is not None:
1958 instance = cls._instances_by_type_and_id.get((cls, id))
1959 if instance:
1960 return instance
1961 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1962
1963
1964 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001965 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001966 assert self._table_name, '_table_name must be defined in your class'
1967 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001968 if not new_record:
1969 if self._initialized and not always_query:
1970 return # We've already been initialized.
1971 if id is None:
1972 id = row[0]
1973 # Tell future constructors to use us instead of re-querying while
1974 # this instance is still around.
1975 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001976
showard6ae5ea92009-02-25 00:11:51 +00001977 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001978
jadmanski0afbb632008-06-06 21:10:57 +00001979 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001980
jadmanski0afbb632008-06-06 21:10:57 +00001981 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001982 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001983
showarda3c58572009-03-12 20:36:59 +00001984 if self._initialized:
1985 differences = self._compare_fields_in_row(row)
1986 if differences:
showard7629f142009-03-27 21:02:02 +00001987 logging.warn(
1988 'initialized %s %s instance requery is updating: %s',
1989 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001990 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001991 self._initialized = True
1992
1993
1994 @classmethod
1995 def _clear_instance_cache(cls):
1996 """Used for testing, clear the internal instance cache."""
1997 cls._instances_by_type_and_id.clear()
1998
1999
showardccbd6c52009-03-21 00:10:21 +00002000 def _fetch_row_from_db(self, row_id):
2001 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2002 rows = _db.execute(sql, (row_id,))
2003 if not rows:
showard76e29d12009-04-15 21:53:10 +00002004 raise DBError("row not found (table=%s, row id=%s)"
2005 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002006 return rows[0]
2007
2008
showarda3c58572009-03-12 20:36:59 +00002009 def _assert_row_length(self, row):
2010 assert len(row) == len(self._fields), (
2011 "table = %s, row = %s/%d, fields = %s/%d" % (
2012 self.__table, row, len(row), self._fields, len(self._fields)))
2013
2014
2015 def _compare_fields_in_row(self, row):
2016 """
2017 Given a row as returned by a SELECT query, compare it to our existing
2018 in memory fields.
2019
2020 @param row - A sequence of values corresponding to fields named in
2021 The class attribute _fields.
2022
2023 @returns A dictionary listing the differences keyed by field name
2024 containing tuples of (current_value, row_value).
2025 """
2026 self._assert_row_length(row)
2027 differences = {}
2028 for field, row_value in itertools.izip(self._fields, row):
2029 current_value = getattr(self, field)
2030 if current_value != row_value:
2031 differences[field] = (current_value, row_value)
2032 return differences
showard2bab8f42008-11-12 18:15:22 +00002033
2034
2035 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002036 """
2037 Update our field attributes using a single row returned by SELECT.
2038
2039 @param row - A sequence of values corresponding to fields named in
2040 the class fields list.
2041 """
2042 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002043
showard2bab8f42008-11-12 18:15:22 +00002044 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002045 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002046 setattr(self, field, value)
2047 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002048
showard2bab8f42008-11-12 18:15:22 +00002049 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002050
mblighe2586682008-02-29 22:45:46 +00002051
showardccbd6c52009-03-21 00:10:21 +00002052 def update_from_database(self):
2053 assert self.id is not None
2054 row = self._fetch_row_from_db(self.id)
2055 self._update_fields_from_row(row)
2056
2057
jadmanski0afbb632008-06-06 21:10:57 +00002058 def count(self, where, table = None):
2059 if not table:
2060 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002061
jadmanski0afbb632008-06-06 21:10:57 +00002062 rows = _db.execute("""
2063 SELECT count(*) FROM %s
2064 WHERE %s
2065 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002066
jadmanski0afbb632008-06-06 21:10:57 +00002067 assert len(rows) == 1
2068
2069 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002070
2071
showardd3dc1992009-04-22 21:01:40 +00002072 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002073 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002074
showard2bab8f42008-11-12 18:15:22 +00002075 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002076 return
mbligh36768f02008-02-22 18:28:33 +00002077
mblighf8c624d2008-07-03 16:58:45 +00002078 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002079 _db.execute(query, (value, self.id))
2080
showard2bab8f42008-11-12 18:15:22 +00002081 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002082
2083
jadmanski0afbb632008-06-06 21:10:57 +00002084 def save(self):
2085 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002086 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002087 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002088 values = []
2089 for key in keys:
2090 value = getattr(self, key)
2091 if value is None:
2092 values.append('NULL')
2093 else:
2094 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002095 values_str = ','.join(values)
2096 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2097 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002098 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002099 # Update our id to the one the database just assigned to us.
2100 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002101
2102
jadmanski0afbb632008-06-06 21:10:57 +00002103 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002104 self._instances_by_type_and_id.pop((type(self), id), None)
2105 self._initialized = False
2106 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002107 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2108 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002109
2110
showard63a34772008-08-18 19:32:50 +00002111 @staticmethod
2112 def _prefix_with(string, prefix):
2113 if string:
2114 string = prefix + string
2115 return string
2116
2117
jadmanski0afbb632008-06-06 21:10:57 +00002118 @classmethod
showard989f25d2008-10-01 11:38:11 +00002119 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002120 """
2121 Construct instances of our class based on the given database query.
2122
2123 @yields One class instance for each row fetched.
2124 """
showard63a34772008-08-18 19:32:50 +00002125 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2126 where = cls._prefix_with(where, 'WHERE ')
2127 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002128 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002129 'joins' : joins,
2130 'where' : where,
2131 'order_by' : order_by})
2132 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002133 for row in rows:
2134 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002135
mbligh36768f02008-02-22 18:28:33 +00002136
2137class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002138 _table_name = 'ineligible_host_queues'
2139 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002140
2141
showard89f84db2009-03-12 20:39:13 +00002142class AtomicGroup(DBObject):
2143 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002144 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2145 'invalid')
showard89f84db2009-03-12 20:39:13 +00002146
2147
showard989f25d2008-10-01 11:38:11 +00002148class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002149 _table_name = 'labels'
2150 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002151 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002152
2153
mbligh36768f02008-02-22 18:28:33 +00002154class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002155 _table_name = 'hosts'
2156 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2157 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2158
2159
jadmanski0afbb632008-06-06 21:10:57 +00002160 def current_task(self):
2161 rows = _db.execute("""
2162 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2163 """, (self.id,))
2164
2165 if len(rows) == 0:
2166 return None
2167 else:
2168 assert len(rows) == 1
2169 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002170 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002171
2172
jadmanski0afbb632008-06-06 21:10:57 +00002173 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002174 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002175 if self.current_task():
2176 self.current_task().requeue()
2177
showard6ae5ea92009-02-25 00:11:51 +00002178
jadmanski0afbb632008-06-06 21:10:57 +00002179 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002180 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002181 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002182
2183
showard170873e2009-01-07 00:22:26 +00002184 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002185 """
showard170873e2009-01-07 00:22:26 +00002186 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002187 """
2188 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002189 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002190 FROM labels
2191 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002192 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002193 ORDER BY labels.name
2194 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002195 platform = None
2196 all_labels = []
2197 for label_name, is_platform in rows:
2198 if is_platform:
2199 platform = label_name
2200 all_labels.append(label_name)
2201 return platform, all_labels
2202
2203
2204 def reverify_tasks(self):
2205 cleanup_task = CleanupTask(host=self)
2206 verify_task = VerifyTask(host=self)
2207 # just to make sure this host does not get taken away
2208 self.set_status('Cleaning')
2209 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002210
2211
mbligh36768f02008-02-22 18:28:33 +00002212class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002213 _table_name = 'host_queue_entries'
2214 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002215 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002216 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002217
2218
showarda3c58572009-03-12 20:36:59 +00002219 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002220 assert id or row
showarda3c58572009-03-12 20:36:59 +00002221 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002222 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002223
jadmanski0afbb632008-06-06 21:10:57 +00002224 if self.host_id:
2225 self.host = Host(self.host_id)
2226 else:
2227 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002228
showard170873e2009-01-07 00:22:26 +00002229 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002230 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002231
2232
showard89f84db2009-03-12 20:39:13 +00002233 @classmethod
2234 def clone(cls, template):
2235 """
2236 Creates a new row using the values from a template instance.
2237
2238 The new instance will not exist in the database or have a valid
2239 id attribute until its save() method is called.
2240 """
2241 assert isinstance(template, cls)
2242 new_row = [getattr(template, field) for field in cls._fields]
2243 clone = cls(row=new_row, new_record=True)
2244 clone.id = None
2245 return clone
2246
2247
showardc85c21b2008-11-24 22:17:37 +00002248 def _view_job_url(self):
2249 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2250
2251
jadmanski0afbb632008-06-06 21:10:57 +00002252 def set_host(self, host):
2253 if host:
2254 self.queue_log_record('Assigning host ' + host.hostname)
2255 self.update_field('host_id', host.id)
2256 self.update_field('active', True)
2257 self.block_host(host.id)
2258 else:
2259 self.queue_log_record('Releasing host')
2260 self.unblock_host(self.host.id)
2261 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002262
jadmanski0afbb632008-06-06 21:10:57 +00002263 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002264
2265
jadmanski0afbb632008-06-06 21:10:57 +00002266 def get_host(self):
2267 return self.host
mbligh36768f02008-02-22 18:28:33 +00002268
2269
jadmanski0afbb632008-06-06 21:10:57 +00002270 def queue_log_record(self, log_line):
2271 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002272 _drone_manager.write_lines_to_file(self.queue_log_path,
2273 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002274
2275
jadmanski0afbb632008-06-06 21:10:57 +00002276 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002277 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002278 row = [0, self.job.id, host_id]
2279 block = IneligibleHostQueue(row=row, new_record=True)
2280 block.save()
mblighe2586682008-02-29 22:45:46 +00002281
2282
jadmanski0afbb632008-06-06 21:10:57 +00002283 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002284 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002285 blocks = IneligibleHostQueue.fetch(
2286 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2287 for block in blocks:
2288 block.delete()
mblighe2586682008-02-29 22:45:46 +00002289
2290
showard2bab8f42008-11-12 18:15:22 +00002291 def set_execution_subdir(self, subdir=None):
2292 if subdir is None:
2293 assert self.get_host()
2294 subdir = self.get_host().hostname
2295 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002296
2297
showard6355f6b2008-12-05 18:52:13 +00002298 def _get_hostname(self):
2299 if self.host:
2300 return self.host.hostname
2301 return 'no host'
2302
2303
showard170873e2009-01-07 00:22:26 +00002304 def __str__(self):
2305 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2306
2307
jadmanski0afbb632008-06-06 21:10:57 +00002308 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002309 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002310
showardb18134f2009-03-20 20:52:18 +00002311 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002312
showardc85c21b2008-11-24 22:17:37 +00002313 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002314 self.update_field('complete', False)
2315 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002316
jadmanski0afbb632008-06-06 21:10:57 +00002317 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002318 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002319 self.update_field('complete', False)
2320 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002321
showardc85c21b2008-11-24 22:17:37 +00002322 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002323 self.update_field('complete', True)
2324 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002325
2326 should_email_status = (status.lower() in _notify_email_statuses or
2327 'all' in _notify_email_statuses)
2328 if should_email_status:
2329 self._email_on_status(status)
2330
2331 self._email_on_job_complete()
2332
2333
2334 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002335 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002336
2337 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2338 self.job.id, self.job.name, hostname, status)
2339 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2340 self.job.id, self.job.name, hostname, status,
2341 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002342 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002343
2344
2345 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002346 if not self.job.is_finished():
2347 return
showard542e8402008-09-19 20:16:18 +00002348
showardc85c21b2008-11-24 22:17:37 +00002349 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002350 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002351 for queue_entry in hosts_queue:
2352 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002353 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002354 queue_entry.status))
2355
2356 summary_text = "\n".join(summary_text)
2357 status_counts = models.Job.objects.get_status_counts(
2358 [self.job.id])[self.job.id]
2359 status = ', '.join('%d %s' % (count, status) for status, count
2360 in status_counts.iteritems())
2361
2362 subject = 'Autotest: Job ID: %s "%s" %s' % (
2363 self.job.id, self.job.name, status)
2364 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2365 self.job.id, self.job.name, status, self._view_job_url(),
2366 summary_text)
showard170873e2009-01-07 00:22:26 +00002367 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002368
2369
showard89f84db2009-03-12 20:39:13 +00002370 def run(self, assigned_host=None):
2371 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002372 assert assigned_host
2373 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002374 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002375
showardb18134f2009-03-20 20:52:18 +00002376 logging.info("%s/%s/%s scheduled on %s, status=%s",
2377 self.job.name, self.meta_host, self.atomic_group_id,
2378 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002379
jadmanski0afbb632008-06-06 21:10:57 +00002380 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002381
showard6ae5ea92009-02-25 00:11:51 +00002382
jadmanski0afbb632008-06-06 21:10:57 +00002383 def requeue(self):
2384 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002385 # verify/cleanup failure sets the execution subdir, so reset it here
2386 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002387 if self.meta_host:
2388 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002389
2390
jadmanski0afbb632008-06-06 21:10:57 +00002391 def handle_host_failure(self):
2392 """\
2393 Called when this queue entry's host has failed verification and
2394 repair.
2395 """
2396 assert not self.meta_host
2397 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002398 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002399
2400
jadmanskif7fa2cc2008-10-01 14:13:23 +00002401 @property
2402 def aborted_by(self):
2403 self._load_abort_info()
2404 return self._aborted_by
2405
2406
2407 @property
2408 def aborted_on(self):
2409 self._load_abort_info()
2410 return self._aborted_on
2411
2412
2413 def _load_abort_info(self):
2414 """ Fetch info about who aborted the job. """
2415 if hasattr(self, "_aborted_by"):
2416 return
2417 rows = _db.execute("""
2418 SELECT users.login, aborted_host_queue_entries.aborted_on
2419 FROM aborted_host_queue_entries
2420 INNER JOIN users
2421 ON users.id = aborted_host_queue_entries.aborted_by_id
2422 WHERE aborted_host_queue_entries.queue_entry_id = %s
2423 """, (self.id,))
2424 if rows:
2425 self._aborted_by, self._aborted_on = rows[0]
2426 else:
2427 self._aborted_by = self._aborted_on = None
2428
2429
showardb2e2c322008-10-14 17:33:55 +00002430 def on_pending(self):
2431 """
2432 Called when an entry in a synchronous job has passed verify. If the
2433 job is ready to run, returns an agent to run the job. Returns None
2434 otherwise.
2435 """
2436 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002437 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002438 if self.job.is_ready():
2439 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002440 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002441 return None
2442
2443
showardd3dc1992009-04-22 21:01:40 +00002444 def abort(self, dispatcher):
2445 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002446
showardd3dc1992009-04-22 21:01:40 +00002447 Status = models.HostQueueEntry.Status
2448 has_running_job_agent = (
2449 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2450 and dispatcher.get_agents_for_entry(self))
2451 if has_running_job_agent:
2452 # do nothing; post-job tasks will finish and then mark this entry
2453 # with status "Aborted" and take care of the host
2454 return
2455
2456 if self.status in (Status.STARTING, Status.PENDING):
2457 self.host.set_status(models.Host.Status.READY)
2458 elif self.status == Status.VERIFYING:
2459 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2460
2461 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002462
2463 def execution_tag(self):
2464 assert self.execution_subdir
2465 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002466
2467
mbligh36768f02008-02-22 18:28:33 +00002468class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002469 _table_name = 'jobs'
2470 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2471 'control_type', 'created_on', 'synch_count', 'timeout',
2472 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2473
2474
showarda3c58572009-03-12 20:36:59 +00002475 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002476 assert id or row
showarda3c58572009-03-12 20:36:59 +00002477 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002478
mblighe2586682008-02-29 22:45:46 +00002479
jadmanski0afbb632008-06-06 21:10:57 +00002480 def is_server_job(self):
2481 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002482
2483
showard170873e2009-01-07 00:22:26 +00002484 def tag(self):
2485 return "%s-%s" % (self.id, self.owner)
2486
2487
jadmanski0afbb632008-06-06 21:10:57 +00002488 def get_host_queue_entries(self):
2489 rows = _db.execute("""
2490 SELECT * FROM host_queue_entries
2491 WHERE job_id= %s
2492 """, (self.id,))
2493 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002494
jadmanski0afbb632008-06-06 21:10:57 +00002495 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002496
jadmanski0afbb632008-06-06 21:10:57 +00002497 return entries
mbligh36768f02008-02-22 18:28:33 +00002498
2499
jadmanski0afbb632008-06-06 21:10:57 +00002500 def set_status(self, status, update_queues=False):
2501 self.update_field('status',status)
2502
2503 if update_queues:
2504 for queue_entry in self.get_host_queue_entries():
2505 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002506
2507
jadmanski0afbb632008-06-06 21:10:57 +00002508 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002509 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2510 status='Pending')
2511 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002512
2513
jadmanski0afbb632008-06-06 21:10:57 +00002514 def num_machines(self, clause = None):
2515 sql = "job_id=%s" % self.id
2516 if clause:
2517 sql += " AND (%s)" % clause
2518 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002519
2520
jadmanski0afbb632008-06-06 21:10:57 +00002521 def num_queued(self):
2522 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002523
2524
jadmanski0afbb632008-06-06 21:10:57 +00002525 def num_active(self):
2526 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002527
2528
jadmanski0afbb632008-06-06 21:10:57 +00002529 def num_complete(self):
2530 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002531
2532
jadmanski0afbb632008-06-06 21:10:57 +00002533 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002534 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002535
mbligh36768f02008-02-22 18:28:33 +00002536
showard6bb7c292009-01-30 01:44:51 +00002537 def _not_yet_run_entries(self, include_verifying=True):
2538 statuses = [models.HostQueueEntry.Status.QUEUED,
2539 models.HostQueueEntry.Status.PENDING]
2540 if include_verifying:
2541 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2542 return models.HostQueueEntry.objects.filter(job=self.id,
2543 status__in=statuses)
2544
2545
2546 def _stop_all_entries(self):
2547 entries_to_stop = self._not_yet_run_entries(
2548 include_verifying=False)
2549 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002550 assert not child_entry.complete, (
2551 '%s status=%s, active=%s, complete=%s' %
2552 (child_entry.id, child_entry.status, child_entry.active,
2553 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002554 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2555 child_entry.host.status = models.Host.Status.READY
2556 child_entry.host.save()
2557 child_entry.status = models.HostQueueEntry.Status.STOPPED
2558 child_entry.save()
2559
showard2bab8f42008-11-12 18:15:22 +00002560 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002561 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002562 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002563 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002564
2565
jadmanski0afbb632008-06-06 21:10:57 +00002566 def write_to_machines_file(self, queue_entry):
2567 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002568 file_path = os.path.join(self.tag(), '.machines')
2569 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002570
2571
showard2bab8f42008-11-12 18:15:22 +00002572 def _next_group_name(self):
2573 query = models.HostQueueEntry.objects.filter(
2574 job=self.id).values('execution_subdir').distinct()
2575 subdirs = (entry['execution_subdir'] for entry in query)
2576 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2577 ids = [int(match.group(1)) for match in groups if match]
2578 if ids:
2579 next_id = max(ids) + 1
2580 else:
2581 next_id = 0
2582 return "group%d" % next_id
2583
2584
showard170873e2009-01-07 00:22:26 +00002585 def _write_control_file(self, execution_tag):
2586 control_path = _drone_manager.attach_file_to_execution(
2587 execution_tag, self.control_file)
2588 return control_path
mbligh36768f02008-02-22 18:28:33 +00002589
showardb2e2c322008-10-14 17:33:55 +00002590
showard2bab8f42008-11-12 18:15:22 +00002591 def get_group_entries(self, queue_entry_from_group):
2592 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002593 return list(HostQueueEntry.fetch(
2594 where='job_id=%s AND execution_subdir=%s',
2595 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002596
2597
showardb2e2c322008-10-14 17:33:55 +00002598 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002599 assert queue_entries
2600 execution_tag = queue_entries[0].execution_tag()
2601 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002602 hostnames = ','.join([entry.get_host().hostname
2603 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002604
showard87ba02a2009-04-20 19:37:32 +00002605 params = _autoserv_command_line(
2606 hostnames, execution_tag,
2607 ['-P', execution_tag, '-n',
2608 _drone_manager.absolute_path(control_path)],
2609 job=self)
mbligh36768f02008-02-22 18:28:33 +00002610
jadmanski0afbb632008-06-06 21:10:57 +00002611 if not self.is_server_job():
2612 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002613
showardb2e2c322008-10-14 17:33:55 +00002614 return params
mblighe2586682008-02-29 22:45:46 +00002615
mbligh36768f02008-02-22 18:28:33 +00002616
showardc9ae1782009-01-30 01:42:37 +00002617 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002618 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002619 return True
showard0fc38302008-10-23 00:44:07 +00002620 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002621 return queue_entry.get_host().dirty
2622 return False
showard21baa452008-10-21 00:08:39 +00002623
showardc9ae1782009-01-30 01:42:37 +00002624
2625 def _should_run_verify(self, queue_entry):
2626 do_not_verify = (queue_entry.host.protection ==
2627 host_protections.Protection.DO_NOT_VERIFY)
2628 if do_not_verify:
2629 return False
2630 return self.run_verify
2631
2632
2633 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002634 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002635 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002636 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002637 if self._should_run_verify(queue_entry):
2638 tasks.append(VerifyTask(queue_entry=queue_entry))
2639 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002640 return tasks
2641
2642
showard2bab8f42008-11-12 18:15:22 +00002643 def _assign_new_group(self, queue_entries):
2644 if len(queue_entries) == 1:
2645 group_name = queue_entries[0].get_host().hostname
2646 else:
2647 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002648 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002649 self.id, [entry.host.hostname for entry in queue_entries],
2650 group_name)
2651
2652 for queue_entry in queue_entries:
2653 queue_entry.set_execution_subdir(group_name)
2654
2655
2656 def _choose_group_to_run(self, include_queue_entry):
2657 chosen_entries = [include_queue_entry]
2658
2659 num_entries_needed = self.synch_count - 1
2660 if num_entries_needed > 0:
2661 pending_entries = HostQueueEntry.fetch(
2662 where='job_id = %s AND status = "Pending" AND id != %s',
2663 params=(self.id, include_queue_entry.id))
2664 chosen_entries += list(pending_entries)[:num_entries_needed]
2665
2666 self._assign_new_group(chosen_entries)
2667 return chosen_entries
2668
2669
2670 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002671 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002672 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2673 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002674
showard2bab8f42008-11-12 18:15:22 +00002675 queue_entries = self._choose_group_to_run(queue_entry)
2676 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002677
2678
2679 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002680 for queue_entry in queue_entries:
2681 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002682 params = self._get_autoserv_params(queue_entries)
2683 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2684 cmd=params)
2685 tasks = initial_tasks + [queue_task]
2686 entry_ids = [entry.id for entry in queue_entries]
2687
showard170873e2009-01-07 00:22:26 +00002688 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002689
2690
mbligh36768f02008-02-22 18:28:33 +00002691if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002692 main()