blob: 43ecd366102b6fe00082e72354260cbda9dab91b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
228 '-r', _drone_manager.absolute_path(results_dir)]
229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
233 return autoserv_argv + extra_args
234
235
showard89f84db2009-03-12 20:39:13 +0000236class SchedulerError(Exception):
237 """Raised by HostScheduler when an inconsistent state occurs."""
238
239
showard63a34772008-08-18 19:32:50 +0000240class HostScheduler(object):
241 def _get_ready_hosts(self):
242 # avoid any host with a currently active queue entry against it
243 hosts = Host.fetch(
244 joins='LEFT JOIN host_queue_entries AS active_hqe '
245 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000246 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000247 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000248 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000249 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
250 return dict((host.id, host) for host in hosts)
251
252
253 @staticmethod
254 def _get_sql_id_list(id_list):
255 return ','.join(str(item_id) for item_id in id_list)
256
257
258 @classmethod
showard989f25d2008-10-01 11:38:11 +0000259 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000260 if not id_list:
261 return {}
showard63a34772008-08-18 19:32:50 +0000262 query %= cls._get_sql_id_list(id_list)
263 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000264 return cls._process_many2many_dict(rows, flip)
265
266
267 @staticmethod
268 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000269 result = {}
270 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000271 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000272 if flip:
273 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000274 result.setdefault(left_id, set()).add(right_id)
275 return result
276
277
278 @classmethod
279 def _get_job_acl_groups(cls, job_ids):
280 query = """
showardd9ac4452009-02-07 02:04:37 +0000281 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000282 FROM jobs
283 INNER JOIN users ON users.login = jobs.owner
284 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
285 WHERE jobs.id IN (%s)
286 """
287 return cls._get_many2many_dict(query, job_ids)
288
289
290 @classmethod
291 def _get_job_ineligible_hosts(cls, job_ids):
292 query = """
293 SELECT job_id, host_id
294 FROM ineligible_host_queues
295 WHERE job_id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
showard989f25d2008-10-01 11:38:11 +0000301 def _get_job_dependencies(cls, job_ids):
302 query = """
303 SELECT job_id, label_id
304 FROM jobs_dependency_labels
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard63a34772008-08-18 19:32:50 +0000311 def _get_host_acls(cls, host_ids):
312 query = """
showardd9ac4452009-02-07 02:04:37 +0000313 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000314 FROM acl_groups_hosts
315 WHERE host_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, host_ids)
318
319
320 @classmethod
321 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000322 if not host_ids:
323 return {}, {}
showard63a34772008-08-18 19:32:50 +0000324 query = """
325 SELECT label_id, host_id
326 FROM hosts_labels
327 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000328 """ % cls._get_sql_id_list(host_ids)
329 rows = _db.execute(query)
330 labels_to_hosts = cls._process_many2many_dict(rows)
331 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
332 return labels_to_hosts, hosts_to_labels
333
334
335 @classmethod
336 def _get_labels(cls):
337 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000338
339
340 def refresh(self, pending_queue_entries):
341 self._hosts_available = self._get_ready_hosts()
342
343 relevant_jobs = [queue_entry.job_id
344 for queue_entry in pending_queue_entries]
345 self._job_acls = self._get_job_acl_groups(relevant_jobs)
346 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000347 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000348
349 host_ids = self._hosts_available.keys()
350 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000351 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
352
353 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000354
355
356 def _is_acl_accessible(self, host_id, queue_entry):
357 job_acls = self._job_acls.get(queue_entry.job_id, set())
358 host_acls = self._host_acls.get(host_id, set())
359 return len(host_acls.intersection(job_acls)) > 0
360
361
showard989f25d2008-10-01 11:38:11 +0000362 def _check_job_dependencies(self, job_dependencies, host_labels):
363 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000364 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000365
366
367 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
368 queue_entry):
showardade14e22009-01-26 22:38:32 +0000369 if not queue_entry.meta_host:
370 # bypass only_if_needed labels when a specific host is selected
371 return True
372
showard989f25d2008-10-01 11:38:11 +0000373 for label_id in host_labels:
374 label = self._labels[label_id]
375 if not label.only_if_needed:
376 # we don't care about non-only_if_needed labels
377 continue
378 if queue_entry.meta_host == label_id:
379 # if the label was requested in a metahost it's OK
380 continue
381 if label_id not in job_dependencies:
382 return False
383 return True
384
385
showard89f84db2009-03-12 20:39:13 +0000386 def _check_atomic_group_labels(self, host_labels, queue_entry):
387 """
388 Determine if the given HostQueueEntry's atomic group settings are okay
389 to schedule on a host with the given labels.
390
391 @param host_labels - A list of label ids that the host has.
392 @param queue_entry - The HostQueueEntry being considered for the host.
393
394 @returns True if atomic group settings are okay, False otherwise.
395 """
396 return (self._get_host_atomic_group_id(host_labels) ==
397 queue_entry.atomic_group_id)
398
399
400 def _get_host_atomic_group_id(self, host_labels):
401 """
402 Return the atomic group label id for a host with the given set of
403 labels if any, or None otherwise. Raises an exception if more than
404 one atomic group are found in the set of labels.
405
406 @param host_labels - A list of label ids that the host has.
407
408 @returns The id of the atomic group found on a label in host_labels
409 or None if no atomic group label is found.
410 @raises SchedulerError - If more than one atomic group label is found.
411 """
412 atomic_ids = [self._labels[label_id].atomic_group_id
413 for label_id in host_labels
414 if self._labels[label_id].atomic_group_id is not None]
415 if not atomic_ids:
416 return None
417 if len(atomic_ids) > 1:
418 raise SchedulerError('More than one atomic label on host.')
419 return atomic_ids[0]
420
421
422 def _get_atomic_group_labels(self, atomic_group_id):
423 """
424 Lookup the label ids that an atomic_group is associated with.
425
426 @param atomic_group_id - The id of the AtomicGroup to look up.
427
428 @returns A generator yeilding Label ids for this atomic group.
429 """
430 return (id for id, label in self._labels.iteritems()
431 if label.atomic_group_id == atomic_group_id
432 and not label.invalid)
433
434
435 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
436 """
437 @param group_hosts - A sequence of Host ids to test for usability
438 and eligibility against the Job associated with queue_entry.
439 @param queue_entry - The HostQueueEntry that these hosts are being
440 tested for eligibility against.
441
442 @returns A subset of group_hosts Host ids that are eligible for the
443 supplied queue_entry.
444 """
445 return set(host_id for host_id in group_hosts
446 if self._is_host_usable(host_id)
447 and self._is_host_eligible_for_job(host_id, queue_entry))
448
449
showard989f25d2008-10-01 11:38:11 +0000450 def _is_host_eligible_for_job(self, host_id, queue_entry):
451 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
452 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000453
showard89f84db2009-03-12 20:39:13 +0000454 return (self._is_acl_accessible(host_id, queue_entry) and
455 self._check_job_dependencies(job_dependencies, host_labels) and
456 self._check_only_if_needed_labels(
457 job_dependencies, host_labels, queue_entry) and
458 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000459
460
showard63a34772008-08-18 19:32:50 +0000461 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000462 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000463 return None
464 return self._hosts_available.pop(queue_entry.host_id, None)
465
466
467 def _is_host_usable(self, host_id):
468 if host_id not in self._hosts_available:
469 # host was already used during this scheduling cycle
470 return False
471 if self._hosts_available[host_id].invalid:
472 # Invalid hosts cannot be used for metahosts. They're included in
473 # the original query because they can be used by non-metahosts.
474 return False
475 return True
476
477
478 def _schedule_metahost(self, queue_entry):
479 label_id = queue_entry.meta_host
480 hosts_in_label = self._label_hosts.get(label_id, set())
481 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
482 set())
483
484 # must iterate over a copy so we can mutate the original while iterating
485 for host_id in list(hosts_in_label):
486 if not self._is_host_usable(host_id):
487 hosts_in_label.remove(host_id)
488 continue
489 if host_id in ineligible_host_ids:
490 continue
showard989f25d2008-10-01 11:38:11 +0000491 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000492 continue
493
showard89f84db2009-03-12 20:39:13 +0000494 # Remove the host from our cached internal state before returning
495 # the host object.
showard63a34772008-08-18 19:32:50 +0000496 hosts_in_label.remove(host_id)
497 return self._hosts_available.pop(host_id)
498 return None
499
500
501 def find_eligible_host(self, queue_entry):
502 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000503 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000504 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000505 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000506 return self._schedule_metahost(queue_entry)
507
508
showard89f84db2009-03-12 20:39:13 +0000509 def find_eligible_atomic_group(self, queue_entry):
510 """
511 Given an atomic group host queue entry, locate an appropriate group
512 of hosts for the associated job to run on.
513
514 The caller is responsible for creating new HQEs for the additional
515 hosts returned in order to run the actual job on them.
516
517 @returns A list of Host instances in a ready state to satisfy this
518 atomic group scheduling. Hosts will all belong to the same
519 atomic group label as specified by the queue_entry.
520 An empty list will be returned if no suitable atomic
521 group could be found.
522
523 TODO(gps): what is responsible for kicking off any attempted repairs on
524 a group of hosts? not this function, but something needs to. We do
525 not communicate that reason for returning [] outside of here...
526 For now, we'll just be unschedulable if enough hosts within one group
527 enter Repair Failed state.
528 """
529 assert queue_entry.atomic_group_id is not None
530 job = queue_entry.job
531 assert job.synch_count and job.synch_count > 0
532 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
533 if job.synch_count > atomic_group.max_number_of_machines:
534 # Such a Job and HostQueueEntry should never be possible to
535 # create using the frontend. Regardless, we can't process it.
536 # Abort it immediately and log an error on the scheduler.
537 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000538 logging.error(
539 'Error: job %d synch_count=%d > requested atomic_group %d '
540 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
541 job.id, job.synch_count, atomic_group.id,
542 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000543 return []
544 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
545 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
546 set())
547
548 # Look in each label associated with atomic_group until we find one with
549 # enough hosts to satisfy the job.
550 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
551 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
552 if queue_entry.meta_host is not None:
553 # If we have a metahost label, only allow its hosts.
554 group_hosts.intersection_update(hosts_in_label)
555 group_hosts -= ineligible_host_ids
556 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
557 group_hosts, queue_entry)
558
559 # Job.synch_count is treated as "minimum synch count" when
560 # scheduling for an atomic group of hosts. The atomic group
561 # number of machines is the maximum to pick out of a single
562 # atomic group label for scheduling at one time.
563 min_hosts = job.synch_count
564 max_hosts = atomic_group.max_number_of_machines
565
566 if len(eligible_hosts_in_group) < min_hosts:
567 # Not enough eligible hosts in this atomic group label.
568 continue
569
showardef519212009-05-08 02:29:53 +0000570 # So that they show up in a sane order when viewing the job.
571 eligible_hosts_in_group = sorted(eligible_hosts_in_group)
572
showard89f84db2009-03-12 20:39:13 +0000573 # Limit ourselves to scheduling the atomic group size.
574 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000575 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000576
577 # Remove the selected hosts from our cached internal state
578 # of available hosts in order to return the Host objects.
579 host_list = []
580 for host_id in eligible_hosts_in_group:
581 hosts_in_label.discard(host_id)
582 host_list.append(self._hosts_available.pop(host_id))
583 return host_list
584
585 return []
586
587
showard170873e2009-01-07 00:22:26 +0000588class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000589 def __init__(self):
590 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000591 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000592 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000593 user_cleanup_time = scheduler_config.config.clean_interval
594 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
595 _db, user_cleanup_time)
596 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000597 self._host_agents = {}
598 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000599
mbligh36768f02008-02-22 18:28:33 +0000600
showard915958d2009-04-22 21:00:58 +0000601 def initialize(self, recover_hosts=True):
602 self._periodic_cleanup.initialize()
603 self._24hr_upkeep.initialize()
604
jadmanski0afbb632008-06-06 21:10:57 +0000605 # always recover processes
606 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000607
jadmanski0afbb632008-06-06 21:10:57 +0000608 if recover_hosts:
609 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000610
611
jadmanski0afbb632008-06-06 21:10:57 +0000612 def tick(self):
showard170873e2009-01-07 00:22:26 +0000613 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000614 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000615 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000616 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000617 self._schedule_new_jobs()
618 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000619 _drone_manager.execute_actions()
620 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000621
showard97aed502008-11-04 02:01:24 +0000622
mblighf3294cc2009-04-08 21:17:38 +0000623 def _run_cleanup(self):
624 self._periodic_cleanup.run_cleanup_maybe()
625 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard170873e2009-01-07 00:22:26 +0000628 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
629 for object_id in object_ids:
630 agent_dict.setdefault(object_id, set()).add(agent)
631
632
633 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
634 for object_id in object_ids:
635 assert object_id in agent_dict
636 agent_dict[object_id].remove(agent)
637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def add_agent(self, agent):
640 self._agents.append(agent)
641 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000642 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
643 self._register_agent_for_ids(self._queue_entry_agents,
644 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000645
showard170873e2009-01-07 00:22:26 +0000646
647 def get_agents_for_entry(self, queue_entry):
648 """
649 Find agents corresponding to the specified queue_entry.
650 """
showardd3dc1992009-04-22 21:01:40 +0000651 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000652
653
654 def host_has_agent(self, host):
655 """
656 Determine if there is currently an Agent present using this host.
657 """
658 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000659
660
jadmanski0afbb632008-06-06 21:10:57 +0000661 def remove_agent(self, agent):
662 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000663 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
664 agent)
665 self._unregister_agent_for_ids(self._queue_entry_agents,
666 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000667
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000670 self._register_pidfiles()
671 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000672 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000673 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000674 self._reverify_remaining_hosts()
675 # reinitialize drones after killing orphaned processes, since they can
676 # leave around files when they die
677 _drone_manager.execute_actions()
678 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000679
showard170873e2009-01-07 00:22:26 +0000680
681 def _register_pidfiles(self):
682 # during recovery we may need to read pidfiles for both running and
683 # parsing entries
684 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000685 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000686 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000687 for pidfile_name in _ALL_PIDFILE_NAMES:
688 pidfile_id = _drone_manager.get_pidfile_id_from(
689 queue_entry.execution_tag(), pidfile_name=pidfile_name)
690 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000691
692
showardd3dc1992009-04-22 21:01:40 +0000693 def _recover_entries_with_status(self, status, orphans, pidfile_name,
694 recover_entries_fn):
695 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000696 for queue_entry in queue_entries:
697 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000698 # synchronous job we've already recovered
699 continue
showardd3dc1992009-04-22 21:01:40 +0000700 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000701 execution_tag = queue_entry.execution_tag()
702 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000703 run_monitor.attach_to_existing_process(execution_tag,
704 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000705
706 log_message = ('Recovering %s entry %s ' %
707 (status.lower(),
708 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000709 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000710 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000711 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000712 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000713 continue
mbligh90a549d2008-03-25 23:52:34 +0000714
showard597bfd32009-05-08 18:22:50 +0000715 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000716 run_monitor.get_process())
717 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
718 orphans.discard(run_monitor.get_process())
719
720
721 def _kill_remaining_orphan_processes(self, orphans):
722 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000723 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000724 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000725
showard170873e2009-01-07 00:22:26 +0000726
showardd3dc1992009-04-22 21:01:40 +0000727 def _recover_running_entries(self, orphans):
728 def recover_entries(job, queue_entries, run_monitor):
729 if run_monitor is not None:
730 queue_task = RecoveryQueueTask(job=job,
731 queue_entries=queue_entries,
732 run_monitor=run_monitor)
733 self.add_agent(Agent(tasks=[queue_task],
734 num_processes=len(queue_entries)))
735 # else, _requeue_other_active_entries will cover this
736
737 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
738 orphans, '.autoserv_execute',
739 recover_entries)
740
741
742 def _recover_gathering_entries(self, orphans):
743 def recover_entries(job, queue_entries, run_monitor):
744 gather_task = GatherLogsTask(job, queue_entries,
745 run_monitor=run_monitor)
746 self.add_agent(Agent([gather_task]))
747
748 self._recover_entries_with_status(
749 models.HostQueueEntry.Status.GATHERING,
750 orphans, _CRASHINFO_PID_FILE, recover_entries)
751
752
753 def _recover_parsing_entries(self, orphans):
754 def recover_entries(job, queue_entries, run_monitor):
755 reparse_task = FinalReparseTask(queue_entries,
756 run_monitor=run_monitor)
757 self.add_agent(Agent([reparse_task], num_processes=0))
758
759 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
760 orphans, _PARSER_PID_FILE,
761 recover_entries)
762
763
764 def _recover_all_recoverable_entries(self):
765 orphans = _drone_manager.get_orphaned_autoserv_processes()
766 self._recover_running_entries(orphans)
767 self._recover_gathering_entries(orphans)
768 self._recover_parsing_entries(orphans)
769 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000770
showard97aed502008-11-04 02:01:24 +0000771
showard170873e2009-01-07 00:22:26 +0000772 def _requeue_other_active_entries(self):
773 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000774 where='active AND NOT complete AND '
775 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000776 for queue_entry in queue_entries:
777 if self.get_agents_for_entry(queue_entry):
778 # entry has already been recovered
779 continue
showardd3dc1992009-04-22 21:01:40 +0000780 if queue_entry.aborted:
781 queue_entry.abort(self)
782 continue
783
784 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000785 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000786 if queue_entry.host:
787 tasks = queue_entry.host.reverify_tasks()
788 self.add_agent(Agent(tasks))
789 agent = queue_entry.requeue()
790
791
792 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000793 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000794 self._reverify_hosts_where("""(status = 'Repairing' OR
795 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000796 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000797
showard170873e2009-01-07 00:22:26 +0000798 # recover "Running" hosts with no active queue entries, although this
799 # should never happen
800 message = ('Recovering running host %s - this probably indicates a '
801 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000802 self._reverify_hosts_where("""status = 'Running' AND
803 id NOT IN (SELECT host_id
804 FROM host_queue_entries
805 WHERE active)""",
806 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000807
808
jadmanski0afbb632008-06-06 21:10:57 +0000809 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000810 print_message='Reverifying host %s'):
811 full_where='locked = 0 AND invalid = 0 AND ' + where
812 for host in Host.fetch(where=full_where):
813 if self.host_has_agent(host):
814 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000815 continue
showard170873e2009-01-07 00:22:26 +0000816 if print_message:
showardb18134f2009-03-20 20:52:18 +0000817 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000818 tasks = host.reverify_tasks()
819 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000820
821
jadmanski0afbb632008-06-06 21:10:57 +0000822 def _recover_hosts(self):
823 # recover "Repair Failed" hosts
824 message = 'Reverifying dead host %s'
825 self._reverify_hosts_where("status = 'Repair Failed'",
826 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000827
828
showard04c82c52008-05-29 19:38:12 +0000829
showardb95b1bd2008-08-15 18:11:04 +0000830 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000831 # prioritize by job priority, then non-metahost over metahost, then FIFO
832 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000833 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000834 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000835 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000836
837
showard89f84db2009-03-12 20:39:13 +0000838 def _refresh_pending_queue_entries(self):
839 """
840 Lookup the pending HostQueueEntries and call our HostScheduler
841 refresh() method given that list. Return the list.
842
843 @returns A list of pending HostQueueEntries sorted in priority order.
844 """
showard63a34772008-08-18 19:32:50 +0000845 queue_entries = self._get_pending_queue_entries()
846 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000847 return []
showardb95b1bd2008-08-15 18:11:04 +0000848
showard63a34772008-08-18 19:32:50 +0000849 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000850
showard89f84db2009-03-12 20:39:13 +0000851 return queue_entries
852
853
854 def _schedule_atomic_group(self, queue_entry):
855 """
856 Schedule the given queue_entry on an atomic group of hosts.
857
858 Returns immediately if there are insufficient available hosts.
859
860 Creates new HostQueueEntries based off of queue_entry for the
861 scheduled hosts and starts them all running.
862 """
863 # This is a virtual host queue entry representing an entire
864 # atomic group, find a group and schedule their hosts.
865 group_hosts = self._host_scheduler.find_eligible_atomic_group(
866 queue_entry)
867 if not group_hosts:
868 return
869 # The first assigned host uses the original HostQueueEntry
870 group_queue_entries = [queue_entry]
871 for assigned_host in group_hosts[1:]:
872 # Create a new HQE for every additional assigned_host.
873 new_hqe = HostQueueEntry.clone(queue_entry)
874 new_hqe.save()
875 group_queue_entries.append(new_hqe)
876 assert len(group_queue_entries) == len(group_hosts)
877 for queue_entry, host in itertools.izip(group_queue_entries,
878 group_hosts):
879 self._run_queue_entry(queue_entry, host)
880
881
882 def _schedule_new_jobs(self):
883 queue_entries = self._refresh_pending_queue_entries()
884 if not queue_entries:
885 return
886
showard63a34772008-08-18 19:32:50 +0000887 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000888 if (queue_entry.atomic_group_id is None or
889 queue_entry.host_id is not None):
890 assigned_host = self._host_scheduler.find_eligible_host(
891 queue_entry)
892 if assigned_host:
893 self._run_queue_entry(queue_entry, assigned_host)
894 else:
895 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000896
897
898 def _run_queue_entry(self, queue_entry, host):
899 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000900 # in some cases (synchronous jobs with run_verify=False), agent may be
901 # None
showard9976ce92008-10-15 20:28:13 +0000902 if agent:
903 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000904
905
jadmanski0afbb632008-06-06 21:10:57 +0000906 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000907 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
908 for agent in self.get_agents_for_entry(entry):
909 agent.abort()
910 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000911
912
showard324bf812009-01-20 23:23:38 +0000913 def _can_start_agent(self, agent, num_started_this_cycle,
914 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000915 # always allow zero-process agents to run
916 if agent.num_processes == 0:
917 return True
918 # don't allow any nonzero-process agents to run after we've reached a
919 # limit (this avoids starvation of many-process agents)
920 if have_reached_limit:
921 return False
922 # total process throttling
showard324bf812009-01-20 23:23:38 +0000923 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000924 return False
925 # if a single agent exceeds the per-cycle throttling, still allow it to
926 # run when it's the first agent in the cycle
927 if num_started_this_cycle == 0:
928 return True
929 # per-cycle throttling
930 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000931 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000932 return False
933 return True
934
935
jadmanski0afbb632008-06-06 21:10:57 +0000936 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000937 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000938 have_reached_limit = False
939 # iterate over copy, so we can remove agents during iteration
940 for agent in list(self._agents):
941 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000942 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000943 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000944 continue
945 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000946 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000947 have_reached_limit):
948 have_reached_limit = True
949 continue
showard4c5374f2008-09-04 17:02:56 +0000950 num_started_this_cycle += agent.num_processes
951 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000952 logging.info('%d running processes',
953 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000954
955
showard29f7cd22009-04-29 21:16:24 +0000956 def _process_recurring_runs(self):
957 recurring_runs = models.RecurringRun.objects.filter(
958 start_date__lte=datetime.datetime.now())
959 for rrun in recurring_runs:
960 # Create job from template
961 job = rrun.job
962 info = rpc_utils.get_job_info(job)
963
964 host_objects = info['hosts']
965 one_time_hosts = info['one_time_hosts']
966 metahost_objects = info['meta_hosts']
967 dependencies = info['dependencies']
968 atomic_group = info['atomic_group']
969
970 for host in one_time_hosts or []:
971 this_host = models.Host.create_one_time_host(host.hostname)
972 host_objects.append(this_host)
973
974 try:
975 rpc_utils.create_new_job(owner=rrun.owner.login,
976 host_objects=host_objects,
977 metahost_objects=metahost_objects,
978 name=job.name,
979 priority=job.priority,
980 control_file=job.control_file,
981 control_type=job.control_type,
982 is_template=False,
983 synch_count=job.synch_count,
984 timeout=job.timeout,
985 run_verify=job.run_verify,
986 email_list=job.email_list,
987 dependencies=dependencies,
988 reboot_before=job.reboot_before,
989 reboot_after=job.reboot_after,
990 atomic_group=atomic_group)
991
992 except Exception, ex:
993 logging.exception(ex)
994 #TODO send email
995
996 if rrun.loop_count == 1:
997 rrun.delete()
998 else:
999 if rrun.loop_count != 0: # if not infinite loop
1000 # calculate new start_date
1001 difference = datetime.timedelta(seconds=rrun.loop_period)
1002 rrun.start_date = rrun.start_date + difference
1003 rrun.loop_count -= 1
1004 rrun.save()
1005
1006
showard170873e2009-01-07 00:22:26 +00001007class PidfileRunMonitor(object):
1008 """
1009 Client must call either run() to start a new process or
1010 attach_to_existing_process().
1011 """
mbligh36768f02008-02-22 18:28:33 +00001012
showard170873e2009-01-07 00:22:26 +00001013 class _PidfileException(Exception):
1014 """
1015 Raised when there's some unexpected behavior with the pid file, but only
1016 used internally (never allowed to escape this class).
1017 """
mbligh36768f02008-02-22 18:28:33 +00001018
1019
showard170873e2009-01-07 00:22:26 +00001020 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001021 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001022 self._start_time = None
1023 self.pidfile_id = None
1024 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001025
1026
showard170873e2009-01-07 00:22:26 +00001027 def _add_nice_command(self, command, nice_level):
1028 if not nice_level:
1029 return command
1030 return ['nice', '-n', str(nice_level)] + command
1031
1032
1033 def _set_start_time(self):
1034 self._start_time = time.time()
1035
1036
1037 def run(self, command, working_directory, nice_level=None, log_file=None,
1038 pidfile_name=None, paired_with_pidfile=None):
1039 assert command is not None
1040 if nice_level is not None:
1041 command = ['nice', '-n', str(nice_level)] + command
1042 self._set_start_time()
1043 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001044 command, working_directory, pidfile_name=pidfile_name,
1045 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001046
1047
showardd3dc1992009-04-22 21:01:40 +00001048 def attach_to_existing_process(self, execution_tag,
1049 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001050 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001051 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1052 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001053 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001054
1055
jadmanski0afbb632008-06-06 21:10:57 +00001056 def kill(self):
showard170873e2009-01-07 00:22:26 +00001057 if self.has_process():
1058 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001059
mbligh36768f02008-02-22 18:28:33 +00001060
showard170873e2009-01-07 00:22:26 +00001061 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001062 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001063 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001064
1065
showard170873e2009-01-07 00:22:26 +00001066 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001067 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001068 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001069 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001070
1071
showard170873e2009-01-07 00:22:26 +00001072 def _read_pidfile(self, use_second_read=False):
1073 assert self.pidfile_id is not None, (
1074 'You must call run() or attach_to_existing_process()')
1075 contents = _drone_manager.get_pidfile_contents(
1076 self.pidfile_id, use_second_read=use_second_read)
1077 if contents.is_invalid():
1078 self._state = drone_manager.PidfileContents()
1079 raise self._PidfileException(contents)
1080 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001081
1082
showard21baa452008-10-21 00:08:39 +00001083 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001084 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1085 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001086 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001087 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001088 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001089
1090
1091 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001092 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001093 return
mblighbb421852008-03-11 22:36:16 +00001094
showard21baa452008-10-21 00:08:39 +00001095 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001096
showard170873e2009-01-07 00:22:26 +00001097 if self._state.process is None:
1098 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001099 return
mbligh90a549d2008-03-25 23:52:34 +00001100
showard21baa452008-10-21 00:08:39 +00001101 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001102 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001103 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001104 return
mbligh90a549d2008-03-25 23:52:34 +00001105
showard170873e2009-01-07 00:22:26 +00001106 # pid but no running process - maybe process *just* exited
1107 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001108 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001109 # autoserv exited without writing an exit code
1110 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001111 self._handle_pidfile_error(
1112 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001113
showard21baa452008-10-21 00:08:39 +00001114
1115 def _get_pidfile_info(self):
1116 """\
1117 After completion, self._state will contain:
1118 pid=None, exit_status=None if autoserv has not yet run
1119 pid!=None, exit_status=None if autoserv is running
1120 pid!=None, exit_status!=None if autoserv has completed
1121 """
1122 try:
1123 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001124 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001125 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001126
1127
showard170873e2009-01-07 00:22:26 +00001128 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001129 """\
1130 Called when no pidfile is found or no pid is in the pidfile.
1131 """
showard170873e2009-01-07 00:22:26 +00001132 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001133 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001134 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1135 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001136 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001137 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001138
1139
showard35162b02009-03-03 02:17:30 +00001140 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001141 """\
1142 Called when autoserv has exited without writing an exit status,
1143 or we've timed out waiting for autoserv to write a pid to the
1144 pidfile. In either case, we just return failure and the caller
1145 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001146
showard170873e2009-01-07 00:22:26 +00001147 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001148 """
1149 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001150 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001151 self._state.exit_status = 1
1152 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001153
1154
jadmanski0afbb632008-06-06 21:10:57 +00001155 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001156 self._get_pidfile_info()
1157 return self._state.exit_status
1158
1159
1160 def num_tests_failed(self):
1161 self._get_pidfile_info()
1162 assert self._state.num_tests_failed is not None
1163 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001164
1165
mbligh36768f02008-02-22 18:28:33 +00001166class Agent(object):
showard170873e2009-01-07 00:22:26 +00001167 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001168 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001169 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001170 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001171 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001172
showard170873e2009-01-07 00:22:26 +00001173 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1174 for task in tasks)
1175 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1176
showardd3dc1992009-04-22 21:01:40 +00001177 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001178 for task in tasks:
1179 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001180
1181
showardd3dc1992009-04-22 21:01:40 +00001182 def _clear_queue(self):
1183 self.queue = Queue.Queue(0)
1184
1185
showard170873e2009-01-07 00:22:26 +00001186 def _union_ids(self, id_lists):
1187 return set(itertools.chain(*id_lists))
1188
1189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def add_task(self, task):
1191 self.queue.put_nowait(task)
1192 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001193
1194
jadmanski0afbb632008-06-06 21:10:57 +00001195 def tick(self):
showard21baa452008-10-21 00:08:39 +00001196 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001197 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001198 self.active_task.poll()
1199 if not self.active_task.is_done():
1200 return
1201 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001205 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001206 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001207 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001208 if not self.active_task.success:
1209 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001210 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001211
jadmanski0afbb632008-06-06 21:10:57 +00001212 if not self.is_done():
1213 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001214
1215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001217 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001218 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1219 # get reset.
1220 new_agent = Agent(self.active_task.failure_tasks)
1221 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001222
mblighe2586682008-02-29 22:45:46 +00001223
showard4c5374f2008-09-04 17:02:56 +00001224 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001225 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001229 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001230
1231
showardd3dc1992009-04-22 21:01:40 +00001232 def abort(self):
showard08a36412009-05-05 01:01:13 +00001233 # abort tasks until the queue is empty or a task ignores the abort
1234 while not self.is_done():
1235 if not self.active_task:
1236 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001237 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001238 if not self.active_task.aborted:
1239 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001240 return
1241 self.active_task = None
1242
showardd3dc1992009-04-22 21:01:40 +00001243
mbligh36768f02008-02-22 18:28:33 +00001244class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001245 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1246 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001247 self.done = False
1248 self.failure_tasks = failure_tasks
1249 self.started = False
1250 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001251 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001252 self.task = None
1253 self.agent = None
1254 self.monitor = None
1255 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001256 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001257 self.queue_entry_ids = []
1258 self.host_ids = []
1259 self.log_file = None
1260
1261
1262 def _set_ids(self, host=None, queue_entries=None):
1263 if queue_entries and queue_entries != [None]:
1264 self.host_ids = [entry.host.id for entry in queue_entries]
1265 self.queue_entry_ids = [entry.id for entry in queue_entries]
1266 else:
1267 assert host
1268 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001269
1270
jadmanski0afbb632008-06-06 21:10:57 +00001271 def poll(self):
showard08a36412009-05-05 01:01:13 +00001272 if not self.started:
1273 self.start()
1274 self.tick()
1275
1276
1277 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001278 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001279 exit_code = self.monitor.exit_code()
1280 if exit_code is None:
1281 return
1282 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001283 else:
1284 success = False
mbligh36768f02008-02-22 18:28:33 +00001285
jadmanski0afbb632008-06-06 21:10:57 +00001286 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001287
1288
jadmanski0afbb632008-06-06 21:10:57 +00001289 def is_done(self):
1290 return self.done
mbligh36768f02008-02-22 18:28:33 +00001291
1292
jadmanski0afbb632008-06-06 21:10:57 +00001293 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001294 if self.done:
1295 return
jadmanski0afbb632008-06-06 21:10:57 +00001296 self.done = True
1297 self.success = success
1298 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001299
1300
jadmanski0afbb632008-06-06 21:10:57 +00001301 def prolog(self):
1302 pass
mblighd64e5702008-04-04 21:39:28 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001306 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001307
mbligh36768f02008-02-22 18:28:33 +00001308
jadmanski0afbb632008-06-06 21:10:57 +00001309 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001310 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001311 _drone_manager.copy_to_results_repository(
1312 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001313
1314
jadmanski0afbb632008-06-06 21:10:57 +00001315 def epilog(self):
1316 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001317
1318
jadmanski0afbb632008-06-06 21:10:57 +00001319 def start(self):
1320 assert self.agent
1321
1322 if not self.started:
1323 self.prolog()
1324 self.run()
1325
1326 self.started = True
1327
1328
1329 def abort(self):
1330 if self.monitor:
1331 self.monitor.kill()
1332 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001333 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001334 self.cleanup()
1335
1336
showard170873e2009-01-07 00:22:26 +00001337 def set_host_log_file(self, base_name, host):
1338 filename = '%s.%s' % (time.time(), base_name)
1339 self.log_file = os.path.join('hosts', host.hostname, filename)
1340
1341
showardde634ee2009-01-30 01:44:24 +00001342 def _get_consistent_execution_tag(self, queue_entries):
1343 first_execution_tag = queue_entries[0].execution_tag()
1344 for queue_entry in queue_entries[1:]:
1345 assert queue_entry.execution_tag() == first_execution_tag, (
1346 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1347 queue_entry,
1348 first_execution_tag,
1349 queue_entries[0]))
1350 return first_execution_tag
1351
1352
showard6b733412009-04-27 20:09:18 +00001353 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001354 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001355 if use_monitor is None:
1356 assert self.monitor
1357 use_monitor = self.monitor
1358 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001359 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001360 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001361 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001362 results_path)
showardde634ee2009-01-30 01:44:24 +00001363
1364 reparse_task = FinalReparseTask(queue_entries)
1365 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1366
1367
showardd3dc1992009-04-22 21:01:40 +00001368 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001369 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001370 self.monitor = PidfileRunMonitor()
1371 self.monitor.run(self.cmd, self._working_directory,
1372 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001373 log_file=self.log_file,
1374 pidfile_name=pidfile_name,
1375 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001376
1377
showardd9205182009-04-27 20:09:55 +00001378class TaskWithJobKeyvals(object):
1379 """AgentTask mixin providing functionality to help with job keyval files."""
1380 _KEYVAL_FILE = 'keyval'
1381 def _format_keyval(self, key, value):
1382 return '%s=%s' % (key, value)
1383
1384
1385 def _keyval_path(self):
1386 """Subclasses must override this"""
1387 raise NotImplemented
1388
1389
1390 def _write_keyval_after_job(self, field, value):
1391 assert self.monitor
1392 if not self.monitor.has_process():
1393 return
1394 _drone_manager.write_lines_to_file(
1395 self._keyval_path(), [self._format_keyval(field, value)],
1396 paired_with_process=self.monitor.get_process())
1397
1398
1399 def _job_queued_keyval(self, job):
1400 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1401
1402
1403 def _write_job_finished(self):
1404 self._write_keyval_after_job("job_finished", int(time.time()))
1405
1406
1407class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001408 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001409 """\
showard170873e2009-01-07 00:22:26 +00001410 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001411 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001412 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001413 # normalize the protection name
1414 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001415
jadmanski0afbb632008-06-06 21:10:57 +00001416 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001417 self.queue_entry_to_fail = queue_entry
1418 # *don't* include the queue entry in IDs -- if the queue entry is
1419 # aborted, we want to leave the repair task running
1420 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001421
1422 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001423 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1424 ['-R', '--host-protection', protection],
1425 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001426 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1427
showard170873e2009-01-07 00:22:26 +00001428 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001429
mbligh36768f02008-02-22 18:28:33 +00001430
jadmanski0afbb632008-06-06 21:10:57 +00001431 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001432 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001433 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001434 if self.queue_entry_to_fail:
1435 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001436
1437
showardd9205182009-04-27 20:09:55 +00001438 def _keyval_path(self):
1439 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1440
1441
showardde634ee2009-01-30 01:44:24 +00001442 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001443 assert self.queue_entry_to_fail
1444
1445 if self.queue_entry_to_fail.meta_host:
1446 return # don't fail metahost entries, they'll be reassigned
1447
1448 self.queue_entry_to_fail.update_from_database()
1449 if self.queue_entry_to_fail.status != 'Queued':
1450 return # entry has been aborted
1451
1452 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001453 queued_key, queued_time = self._job_queued_keyval(
1454 self.queue_entry_to_fail.job)
1455 self._write_keyval_after_job(queued_key, queued_time)
1456 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001457 # copy results logs into the normal place for job results
1458 _drone_manager.copy_results_on_drone(
1459 self.monitor.get_process(),
1460 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001461 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001462
showardccbd6c52009-03-21 00:10:21 +00001463 self._copy_and_parse_results([self.queue_entry_to_fail])
1464 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001465
1466
jadmanski0afbb632008-06-06 21:10:57 +00001467 def epilog(self):
1468 super(RepairTask, self).epilog()
1469 if self.success:
1470 self.host.set_status('Ready')
1471 else:
1472 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001473 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001474 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001475
1476
showard8fe93b52008-11-18 17:53:22 +00001477class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001478 def epilog(self):
1479 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001480 should_copy_results = (self.queue_entry and not self.success
1481 and not self.queue_entry.meta_host)
1482 if should_copy_results:
1483 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001484 destination = os.path.join(self.queue_entry.execution_tag(),
1485 os.path.basename(self.log_file))
1486 _drone_manager.copy_to_results_repository(
1487 self.monitor.get_process(), self.log_file,
1488 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001489
1490
1491class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001492 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001493 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.host = host or queue_entry.host
1495 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001496
jadmanski0afbb632008-06-06 21:10:57 +00001497 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001498 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1499 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001500 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001501 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1502 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001503
showard170873e2009-01-07 00:22:26 +00001504 self.set_host_log_file('verify', self.host)
1505 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001506
1507
jadmanski0afbb632008-06-06 21:10:57 +00001508 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001509 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001510 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001511 if self.queue_entry:
1512 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001513 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001514
1515
jadmanski0afbb632008-06-06 21:10:57 +00001516 def epilog(self):
1517 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001518
jadmanski0afbb632008-06-06 21:10:57 +00001519 if self.success:
1520 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001521
1522
showardd9205182009-04-27 20:09:55 +00001523class QueueTask(AgentTask, TaskWithJobKeyvals):
jadmanski0afbb632008-06-06 21:10:57 +00001524 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001525 self.job = job
1526 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001527 super(QueueTask, self).__init__(cmd, self._execution_tag())
1528 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001529
1530
showard73ec0442009-02-07 02:05:20 +00001531 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001532 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001533
1534
1535 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1536 keyval_contents = '\n'.join(self._format_keyval(key, value)
1537 for key, value in keyval_dict.iteritems())
1538 # always end with a newline to allow additional keyvals to be written
1539 keyval_contents += '\n'
1540 _drone_manager.attach_file_to_execution(self._execution_tag(),
1541 keyval_contents,
1542 file_path=keyval_path)
1543
1544
1545 def _write_keyvals_before_job(self, keyval_dict):
1546 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1547
1548
showard170873e2009-01-07 00:22:26 +00001549 def _write_host_keyvals(self, host):
1550 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1551 host.hostname)
1552 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001553 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1554 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001555
1556
showard170873e2009-01-07 00:22:26 +00001557 def _execution_tag(self):
1558 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001559
1560
jadmanski0afbb632008-06-06 21:10:57 +00001561 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001562 queued_key, queued_time = self._job_queued_keyval(self.job)
1563 self._write_keyvals_before_job({queued_key : queued_time})
jadmanski0afbb632008-06-06 21:10:57 +00001564 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001565 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001566 queue_entry.set_status('Running')
1567 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001568 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001569 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001570 assert len(self.queue_entries) == 1
1571 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001572
1573
showard35162b02009-03-03 02:17:30 +00001574 def _write_lost_process_error_file(self):
1575 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1576 _drone_manager.write_lines_to_file(error_file_path,
1577 [_LOST_PROCESS_ERROR])
1578
1579
showardd3dc1992009-04-22 21:01:40 +00001580 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001581 if not self.monitor:
1582 return
1583
showardd9205182009-04-27 20:09:55 +00001584 self._write_job_finished()
1585
showardd3dc1992009-04-22 21:01:40 +00001586 # both of these conditionals can be true, iff the process ran, wrote a
1587 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001588 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001589 gather_task = GatherLogsTask(self.job, self.queue_entries)
1590 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001591
1592 if self.monitor.lost_process:
1593 self._write_lost_process_error_file()
1594 for queue_entry in self.queue_entries:
1595 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001596
1597
showardcbd74612008-11-19 21:42:02 +00001598 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001599 _drone_manager.write_lines_to_file(
1600 os.path.join(self._execution_tag(), 'status.log'),
1601 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001602 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001603
1604
jadmanskif7fa2cc2008-10-01 14:13:23 +00001605 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001606 if not self.monitor or not self.monitor.has_process():
1607 return
1608
jadmanskif7fa2cc2008-10-01 14:13:23 +00001609 # build up sets of all the aborted_by and aborted_on values
1610 aborted_by, aborted_on = set(), set()
1611 for queue_entry in self.queue_entries:
1612 if queue_entry.aborted_by:
1613 aborted_by.add(queue_entry.aborted_by)
1614 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1615 aborted_on.add(t)
1616
1617 # extract some actual, unique aborted by value and write it out
1618 assert len(aborted_by) <= 1
1619 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001620 aborted_by_value = aborted_by.pop()
1621 aborted_on_value = max(aborted_on)
1622 else:
1623 aborted_by_value = 'autotest_system'
1624 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001625
showarda0382352009-02-11 23:36:43 +00001626 self._write_keyval_after_job("aborted_by", aborted_by_value)
1627 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001628
showardcbd74612008-11-19 21:42:02 +00001629 aborted_on_string = str(datetime.datetime.fromtimestamp(
1630 aborted_on_value))
1631 self._write_status_comment('Job aborted by %s on %s' %
1632 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001633
1634
jadmanski0afbb632008-06-06 21:10:57 +00001635 def abort(self):
1636 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001637 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001638 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001639
1640
jadmanski0afbb632008-06-06 21:10:57 +00001641 def epilog(self):
1642 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001643 self._finish_task()
1644 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001645
1646
mblighbb421852008-03-11 22:36:16 +00001647class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001648 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001649 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001650 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001651
1652
jadmanski0afbb632008-06-06 21:10:57 +00001653 def run(self):
1654 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001655
1656
jadmanski0afbb632008-06-06 21:10:57 +00001657 def prolog(self):
1658 # recovering an existing process - don't do prolog
1659 pass
mblighbb421852008-03-11 22:36:16 +00001660
1661
showardd3dc1992009-04-22 21:01:40 +00001662class PostJobTask(AgentTask):
1663 def __init__(self, queue_entries, pidfile_name, logfile_name,
1664 run_monitor=None):
1665 """
1666 If run_monitor != None, we're recovering a running task.
1667 """
1668 self._queue_entries = queue_entries
1669 self._pidfile_name = pidfile_name
1670 self._run_monitor = run_monitor
1671
1672 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1673 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1674 self._autoserv_monitor = PidfileRunMonitor()
1675 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1676 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1677
1678 if _testing_mode:
1679 command = 'true'
1680 else:
1681 command = self._generate_command(self._results_dir)
1682
1683 super(PostJobTask, self).__init__(cmd=command,
1684 working_directory=self._execution_tag)
1685
1686 self.log_file = os.path.join(self._execution_tag, logfile_name)
1687 self._final_status = self._determine_final_status()
1688
1689
1690 def _generate_command(self, results_dir):
1691 raise NotImplementedError('Subclasses must override this')
1692
1693
1694 def _job_was_aborted(self):
1695 was_aborted = None
1696 for queue_entry in self._queue_entries:
1697 queue_entry.update_from_database()
1698 if was_aborted is None: # first queue entry
1699 was_aborted = bool(queue_entry.aborted)
1700 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1701 email_manager.manager.enqueue_notify_email(
1702 'Inconsistent abort state',
1703 'Queue entries have inconsistent abort state: ' +
1704 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1705 # don't crash here, just assume true
1706 return True
1707 return was_aborted
1708
1709
1710 def _determine_final_status(self):
1711 if self._job_was_aborted():
1712 return models.HostQueueEntry.Status.ABORTED
1713
1714 # we'll use a PidfileRunMonitor to read the autoserv exit status
1715 if self._autoserv_monitor.exit_code() == 0:
1716 return models.HostQueueEntry.Status.COMPLETED
1717 return models.HostQueueEntry.Status.FAILED
1718
1719
1720 def run(self):
1721 if self._run_monitor is not None:
1722 self.monitor = self._run_monitor
1723 else:
1724 # make sure we actually have results to work with.
1725 # this should never happen in normal operation.
1726 if not self._autoserv_monitor.has_process():
1727 email_manager.manager.enqueue_notify_email(
1728 'No results in post-job task',
1729 'No results in post-job task at %s' %
1730 self._autoserv_monitor.pidfile_id)
1731 self.finished(False)
1732 return
1733
1734 super(PostJobTask, self).run(
1735 pidfile_name=self._pidfile_name,
1736 paired_with_pidfile=self._paired_with_pidfile)
1737
1738
1739 def _set_all_statuses(self, status):
1740 for queue_entry in self._queue_entries:
1741 queue_entry.set_status(status)
1742
1743
1744 def abort(self):
1745 # override AgentTask.abort() to avoid killing the process and ending
1746 # the task. post-job tasks continue when the job is aborted.
1747 pass
1748
1749
1750class GatherLogsTask(PostJobTask):
1751 """
1752 Task responsible for
1753 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1754 * copying logs to the results repository
1755 * spawning CleanupTasks for hosts, if necessary
1756 * spawning a FinalReparseTask for the job
1757 """
1758 def __init__(self, job, queue_entries, run_monitor=None):
1759 self._job = job
1760 super(GatherLogsTask, self).__init__(
1761 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1762 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1763 self._set_ids(queue_entries=queue_entries)
1764
1765
1766 def _generate_command(self, results_dir):
1767 host_list = ','.join(queue_entry.host.hostname
1768 for queue_entry in self._queue_entries)
1769 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1770 '-r', results_dir]
1771
1772
1773 def prolog(self):
1774 super(GatherLogsTask, self).prolog()
1775 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1776
1777
1778 def _reboot_hosts(self):
1779 reboot_after = self._job.reboot_after
1780 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001781 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1782 do_reboot = True
1783 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001784 do_reboot = True
1785 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1786 final_success = (
1787 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1788 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1789 do_reboot = (final_success and num_tests_failed == 0)
1790
1791 for queue_entry in self._queue_entries:
1792 if do_reboot:
1793 # don't pass the queue entry to the CleanupTask. if the cleanup
1794 # fails, the job doesn't care -- it's over.
1795 cleanup_task = CleanupTask(host=queue_entry.host)
1796 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1797 else:
1798 queue_entry.host.set_status('Ready')
1799
1800
1801 def epilog(self):
1802 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001803 self._copy_and_parse_results(self._queue_entries,
1804 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001805 self._reboot_hosts()
1806
1807
showard0bbfc212009-04-29 21:06:13 +00001808 def run(self):
showard597bfd32009-05-08 18:22:50 +00001809 autoserv_exit_code = self._autoserv_monitor.exit_code()
1810 # only run if Autoserv exited due to some signal. if we have no exit
1811 # code, assume something bad (and signal-like) happened.
1812 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001813 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001814 else:
1815 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001816
1817
showard8fe93b52008-11-18 17:53:22 +00001818class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001819 def __init__(self, host=None, queue_entry=None):
1820 assert bool(host) ^ bool(queue_entry)
1821 if queue_entry:
1822 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001823 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001824 self.host = host
showard170873e2009-01-07 00:22:26 +00001825
1826 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001827 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1828 ['--cleanup'],
1829 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001830 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001831 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1832 failure_tasks=[repair_task])
1833
1834 self._set_ids(host=host, queue_entries=[queue_entry])
1835 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001836
mblighd5c95802008-03-05 00:33:46 +00001837
jadmanski0afbb632008-06-06 21:10:57 +00001838 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001839 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001840 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001841 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001842
mblighd5c95802008-03-05 00:33:46 +00001843
showard21baa452008-10-21 00:08:39 +00001844 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001845 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001846 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001847 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001848 self.host.update_field('dirty', 0)
1849
1850
showardd3dc1992009-04-22 21:01:40 +00001851class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001852 _num_running_parses = 0
1853
showardd3dc1992009-04-22 21:01:40 +00001854 def __init__(self, queue_entries, run_monitor=None):
1855 super(FinalReparseTask, self).__init__(queue_entries,
1856 pidfile_name=_PARSER_PID_FILE,
1857 logfile_name='.parse.log',
1858 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001859 # don't use _set_ids, since we don't want to set the host_ids
1860 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001861 self._parse_started = False
1862
showard97aed502008-11-04 02:01:24 +00001863
1864 @classmethod
1865 def _increment_running_parses(cls):
1866 cls._num_running_parses += 1
1867
1868
1869 @classmethod
1870 def _decrement_running_parses(cls):
1871 cls._num_running_parses -= 1
1872
1873
1874 @classmethod
1875 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001876 return (cls._num_running_parses <
1877 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001878
1879
1880 def prolog(self):
1881 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001882 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001883
1884
1885 def epilog(self):
1886 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001887 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001888
1889
showardd3dc1992009-04-22 21:01:40 +00001890 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001891 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001892 results_dir]
showard97aed502008-11-04 02:01:24 +00001893
1894
showard08a36412009-05-05 01:01:13 +00001895 def tick(self):
1896 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001897 # and we can, at which point we revert to default behavior
1898 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001899 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001900 else:
1901 self._try_starting_parse()
1902
1903
1904 def run(self):
1905 # override run() to not actually run unless we can
1906 self._try_starting_parse()
1907
1908
1909 def _try_starting_parse(self):
1910 if not self._can_run_new_parse():
1911 return
showard170873e2009-01-07 00:22:26 +00001912
showard97aed502008-11-04 02:01:24 +00001913 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001914 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001915
showard97aed502008-11-04 02:01:24 +00001916 self._increment_running_parses()
1917 self._parse_started = True
1918
1919
1920 def finished(self, success):
1921 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001922 if self._parse_started:
1923 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001924
1925
showardc9ae1782009-01-30 01:42:37 +00001926class SetEntryPendingTask(AgentTask):
1927 def __init__(self, queue_entry):
1928 super(SetEntryPendingTask, self).__init__(cmd='')
1929 self._queue_entry = queue_entry
1930 self._set_ids(queue_entries=[queue_entry])
1931
1932
1933 def run(self):
1934 agent = self._queue_entry.on_pending()
1935 if agent:
1936 self.agent.dispatcher.add_agent(agent)
1937 self.finished(True)
1938
1939
showarda3c58572009-03-12 20:36:59 +00001940class DBError(Exception):
1941 """Raised by the DBObject constructor when its select fails."""
1942
1943
mbligh36768f02008-02-22 18:28:33 +00001944class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001945 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001946
1947 # Subclasses MUST override these:
1948 _table_name = ''
1949 _fields = ()
1950
showarda3c58572009-03-12 20:36:59 +00001951 # A mapping from (type, id) to the instance of the object for that
1952 # particular id. This prevents us from creating new Job() and Host()
1953 # instances for every HostQueueEntry object that we instantiate as
1954 # multiple HQEs often share the same Job.
1955 _instances_by_type_and_id = weakref.WeakValueDictionary()
1956 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001957
showarda3c58572009-03-12 20:36:59 +00001958
1959 def __new__(cls, id=None, **kwargs):
1960 """
1961 Look to see if we already have an instance for this particular type
1962 and id. If so, use it instead of creating a duplicate instance.
1963 """
1964 if id is not None:
1965 instance = cls._instances_by_type_and_id.get((cls, id))
1966 if instance:
1967 return instance
1968 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1969
1970
1971 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001972 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001973 assert self._table_name, '_table_name must be defined in your class'
1974 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001975 if not new_record:
1976 if self._initialized and not always_query:
1977 return # We've already been initialized.
1978 if id is None:
1979 id = row[0]
1980 # Tell future constructors to use us instead of re-querying while
1981 # this instance is still around.
1982 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001983
showard6ae5ea92009-02-25 00:11:51 +00001984 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001985
jadmanski0afbb632008-06-06 21:10:57 +00001986 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001987
jadmanski0afbb632008-06-06 21:10:57 +00001988 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001989 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001990
showarda3c58572009-03-12 20:36:59 +00001991 if self._initialized:
1992 differences = self._compare_fields_in_row(row)
1993 if differences:
showard7629f142009-03-27 21:02:02 +00001994 logging.warn(
1995 'initialized %s %s instance requery is updating: %s',
1996 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001997 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001998 self._initialized = True
1999
2000
2001 @classmethod
2002 def _clear_instance_cache(cls):
2003 """Used for testing, clear the internal instance cache."""
2004 cls._instances_by_type_and_id.clear()
2005
2006
showardccbd6c52009-03-21 00:10:21 +00002007 def _fetch_row_from_db(self, row_id):
2008 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2009 rows = _db.execute(sql, (row_id,))
2010 if not rows:
showard76e29d12009-04-15 21:53:10 +00002011 raise DBError("row not found (table=%s, row id=%s)"
2012 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002013 return rows[0]
2014
2015
showarda3c58572009-03-12 20:36:59 +00002016 def _assert_row_length(self, row):
2017 assert len(row) == len(self._fields), (
2018 "table = %s, row = %s/%d, fields = %s/%d" % (
2019 self.__table, row, len(row), self._fields, len(self._fields)))
2020
2021
2022 def _compare_fields_in_row(self, row):
2023 """
2024 Given a row as returned by a SELECT query, compare it to our existing
2025 in memory fields.
2026
2027 @param row - A sequence of values corresponding to fields named in
2028 The class attribute _fields.
2029
2030 @returns A dictionary listing the differences keyed by field name
2031 containing tuples of (current_value, row_value).
2032 """
2033 self._assert_row_length(row)
2034 differences = {}
2035 for field, row_value in itertools.izip(self._fields, row):
2036 current_value = getattr(self, field)
2037 if current_value != row_value:
2038 differences[field] = (current_value, row_value)
2039 return differences
showard2bab8f42008-11-12 18:15:22 +00002040
2041
2042 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002043 """
2044 Update our field attributes using a single row returned by SELECT.
2045
2046 @param row - A sequence of values corresponding to fields named in
2047 the class fields list.
2048 """
2049 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002050
showard2bab8f42008-11-12 18:15:22 +00002051 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002052 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002053 setattr(self, field, value)
2054 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002055
showard2bab8f42008-11-12 18:15:22 +00002056 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002057
mblighe2586682008-02-29 22:45:46 +00002058
showardccbd6c52009-03-21 00:10:21 +00002059 def update_from_database(self):
2060 assert self.id is not None
2061 row = self._fetch_row_from_db(self.id)
2062 self._update_fields_from_row(row)
2063
2064
jadmanski0afbb632008-06-06 21:10:57 +00002065 def count(self, where, table = None):
2066 if not table:
2067 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002068
jadmanski0afbb632008-06-06 21:10:57 +00002069 rows = _db.execute("""
2070 SELECT count(*) FROM %s
2071 WHERE %s
2072 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002073
jadmanski0afbb632008-06-06 21:10:57 +00002074 assert len(rows) == 1
2075
2076 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002077
2078
showardd3dc1992009-04-22 21:01:40 +00002079 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002080 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002081
showard2bab8f42008-11-12 18:15:22 +00002082 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002083 return
mbligh36768f02008-02-22 18:28:33 +00002084
mblighf8c624d2008-07-03 16:58:45 +00002085 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002086 _db.execute(query, (value, self.id))
2087
showard2bab8f42008-11-12 18:15:22 +00002088 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def save(self):
2092 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002093 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002094 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002095 values = []
2096 for key in keys:
2097 value = getattr(self, key)
2098 if value is None:
2099 values.append('NULL')
2100 else:
2101 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002102 values_str = ','.join(values)
2103 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2104 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002105 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002106 # Update our id to the one the database just assigned to us.
2107 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002108
2109
jadmanski0afbb632008-06-06 21:10:57 +00002110 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002111 self._instances_by_type_and_id.pop((type(self), id), None)
2112 self._initialized = False
2113 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002114 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2115 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002116
2117
showard63a34772008-08-18 19:32:50 +00002118 @staticmethod
2119 def _prefix_with(string, prefix):
2120 if string:
2121 string = prefix + string
2122 return string
2123
2124
jadmanski0afbb632008-06-06 21:10:57 +00002125 @classmethod
showard989f25d2008-10-01 11:38:11 +00002126 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002127 """
2128 Construct instances of our class based on the given database query.
2129
2130 @yields One class instance for each row fetched.
2131 """
showard63a34772008-08-18 19:32:50 +00002132 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2133 where = cls._prefix_with(where, 'WHERE ')
2134 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002135 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002136 'joins' : joins,
2137 'where' : where,
2138 'order_by' : order_by})
2139 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002140 for row in rows:
2141 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002142
mbligh36768f02008-02-22 18:28:33 +00002143
2144class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002145 _table_name = 'ineligible_host_queues'
2146 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002147
2148
showard89f84db2009-03-12 20:39:13 +00002149class AtomicGroup(DBObject):
2150 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002151 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2152 'invalid')
showard89f84db2009-03-12 20:39:13 +00002153
2154
showard989f25d2008-10-01 11:38:11 +00002155class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002156 _table_name = 'labels'
2157 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002158 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002159
2160
mbligh36768f02008-02-22 18:28:33 +00002161class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002162 _table_name = 'hosts'
2163 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2164 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2165
2166
jadmanski0afbb632008-06-06 21:10:57 +00002167 def current_task(self):
2168 rows = _db.execute("""
2169 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2170 """, (self.id,))
2171
2172 if len(rows) == 0:
2173 return None
2174 else:
2175 assert len(rows) == 1
2176 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002177 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002178
2179
jadmanski0afbb632008-06-06 21:10:57 +00002180 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002181 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002182 if self.current_task():
2183 self.current_task().requeue()
2184
showard6ae5ea92009-02-25 00:11:51 +00002185
jadmanski0afbb632008-06-06 21:10:57 +00002186 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002187 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002188 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002189
2190
showard170873e2009-01-07 00:22:26 +00002191 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002192 """
showard170873e2009-01-07 00:22:26 +00002193 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002194 """
2195 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002196 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002197 FROM labels
2198 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002199 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002200 ORDER BY labels.name
2201 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002202 platform = None
2203 all_labels = []
2204 for label_name, is_platform in rows:
2205 if is_platform:
2206 platform = label_name
2207 all_labels.append(label_name)
2208 return platform, all_labels
2209
2210
2211 def reverify_tasks(self):
2212 cleanup_task = CleanupTask(host=self)
2213 verify_task = VerifyTask(host=self)
2214 # just to make sure this host does not get taken away
2215 self.set_status('Cleaning')
2216 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002217
2218
mbligh36768f02008-02-22 18:28:33 +00002219class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002220 _table_name = 'host_queue_entries'
2221 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002222 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002223 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002224
2225
showarda3c58572009-03-12 20:36:59 +00002226 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002227 assert id or row
showarda3c58572009-03-12 20:36:59 +00002228 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002229 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002230
jadmanski0afbb632008-06-06 21:10:57 +00002231 if self.host_id:
2232 self.host = Host(self.host_id)
2233 else:
2234 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002235
showard170873e2009-01-07 00:22:26 +00002236 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002237 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002238
2239
showard89f84db2009-03-12 20:39:13 +00002240 @classmethod
2241 def clone(cls, template):
2242 """
2243 Creates a new row using the values from a template instance.
2244
2245 The new instance will not exist in the database or have a valid
2246 id attribute until its save() method is called.
2247 """
2248 assert isinstance(template, cls)
2249 new_row = [getattr(template, field) for field in cls._fields]
2250 clone = cls(row=new_row, new_record=True)
2251 clone.id = None
2252 return clone
2253
2254
showardc85c21b2008-11-24 22:17:37 +00002255 def _view_job_url(self):
2256 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2257
2258
jadmanski0afbb632008-06-06 21:10:57 +00002259 def set_host(self, host):
2260 if host:
2261 self.queue_log_record('Assigning host ' + host.hostname)
2262 self.update_field('host_id', host.id)
2263 self.update_field('active', True)
2264 self.block_host(host.id)
2265 else:
2266 self.queue_log_record('Releasing host')
2267 self.unblock_host(self.host.id)
2268 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002269
jadmanski0afbb632008-06-06 21:10:57 +00002270 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002271
2272
jadmanski0afbb632008-06-06 21:10:57 +00002273 def get_host(self):
2274 return self.host
mbligh36768f02008-02-22 18:28:33 +00002275
2276
jadmanski0afbb632008-06-06 21:10:57 +00002277 def queue_log_record(self, log_line):
2278 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002279 _drone_manager.write_lines_to_file(self.queue_log_path,
2280 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002281
2282
jadmanski0afbb632008-06-06 21:10:57 +00002283 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002284 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002285 row = [0, self.job.id, host_id]
2286 block = IneligibleHostQueue(row=row, new_record=True)
2287 block.save()
mblighe2586682008-02-29 22:45:46 +00002288
2289
jadmanski0afbb632008-06-06 21:10:57 +00002290 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002291 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002292 blocks = IneligibleHostQueue.fetch(
2293 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2294 for block in blocks:
2295 block.delete()
mblighe2586682008-02-29 22:45:46 +00002296
2297
showard2bab8f42008-11-12 18:15:22 +00002298 def set_execution_subdir(self, subdir=None):
2299 if subdir is None:
2300 assert self.get_host()
2301 subdir = self.get_host().hostname
2302 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002303
2304
showard6355f6b2008-12-05 18:52:13 +00002305 def _get_hostname(self):
2306 if self.host:
2307 return self.host.hostname
2308 return 'no host'
2309
2310
showard170873e2009-01-07 00:22:26 +00002311 def __str__(self):
2312 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2313
2314
jadmanski0afbb632008-06-06 21:10:57 +00002315 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002316 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002317
showardb18134f2009-03-20 20:52:18 +00002318 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002319
showardc85c21b2008-11-24 22:17:37 +00002320 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002321 self.update_field('complete', False)
2322 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002323
jadmanski0afbb632008-06-06 21:10:57 +00002324 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002325 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002326 self.update_field('complete', False)
2327 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002328
showardc85c21b2008-11-24 22:17:37 +00002329 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002330 self.update_field('complete', True)
2331 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002332
2333 should_email_status = (status.lower() in _notify_email_statuses or
2334 'all' in _notify_email_statuses)
2335 if should_email_status:
2336 self._email_on_status(status)
2337
2338 self._email_on_job_complete()
2339
2340
2341 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002342 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002343
2344 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2345 self.job.id, self.job.name, hostname, status)
2346 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2347 self.job.id, self.job.name, hostname, status,
2348 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002349 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002350
2351
2352 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002353 if not self.job.is_finished():
2354 return
showard542e8402008-09-19 20:16:18 +00002355
showardc85c21b2008-11-24 22:17:37 +00002356 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002357 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002358 for queue_entry in hosts_queue:
2359 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002360 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002361 queue_entry.status))
2362
2363 summary_text = "\n".join(summary_text)
2364 status_counts = models.Job.objects.get_status_counts(
2365 [self.job.id])[self.job.id]
2366 status = ', '.join('%d %s' % (count, status) for status, count
2367 in status_counts.iteritems())
2368
2369 subject = 'Autotest: Job ID: %s "%s" %s' % (
2370 self.job.id, self.job.name, status)
2371 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2372 self.job.id, self.job.name, status, self._view_job_url(),
2373 summary_text)
showard170873e2009-01-07 00:22:26 +00002374 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002375
2376
showard89f84db2009-03-12 20:39:13 +00002377 def run(self, assigned_host=None):
2378 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002379 assert assigned_host
2380 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002381 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002382
showardb18134f2009-03-20 20:52:18 +00002383 logging.info("%s/%s/%s scheduled on %s, status=%s",
2384 self.job.name, self.meta_host, self.atomic_group_id,
2385 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002386
jadmanski0afbb632008-06-06 21:10:57 +00002387 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002388
showard6ae5ea92009-02-25 00:11:51 +00002389
jadmanski0afbb632008-06-06 21:10:57 +00002390 def requeue(self):
2391 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002392 # verify/cleanup failure sets the execution subdir, so reset it here
2393 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002394 if self.meta_host:
2395 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002396
2397
jadmanski0afbb632008-06-06 21:10:57 +00002398 def handle_host_failure(self):
2399 """\
2400 Called when this queue entry's host has failed verification and
2401 repair.
2402 """
2403 assert not self.meta_host
2404 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002405 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002406
2407
jadmanskif7fa2cc2008-10-01 14:13:23 +00002408 @property
2409 def aborted_by(self):
2410 self._load_abort_info()
2411 return self._aborted_by
2412
2413
2414 @property
2415 def aborted_on(self):
2416 self._load_abort_info()
2417 return self._aborted_on
2418
2419
2420 def _load_abort_info(self):
2421 """ Fetch info about who aborted the job. """
2422 if hasattr(self, "_aborted_by"):
2423 return
2424 rows = _db.execute("""
2425 SELECT users.login, aborted_host_queue_entries.aborted_on
2426 FROM aborted_host_queue_entries
2427 INNER JOIN users
2428 ON users.id = aborted_host_queue_entries.aborted_by_id
2429 WHERE aborted_host_queue_entries.queue_entry_id = %s
2430 """, (self.id,))
2431 if rows:
2432 self._aborted_by, self._aborted_on = rows[0]
2433 else:
2434 self._aborted_by = self._aborted_on = None
2435
2436
showardb2e2c322008-10-14 17:33:55 +00002437 def on_pending(self):
2438 """
2439 Called when an entry in a synchronous job has passed verify. If the
2440 job is ready to run, returns an agent to run the job. Returns None
2441 otherwise.
2442 """
2443 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002444 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002445 if self.job.is_ready():
2446 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002447 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002448 return None
2449
2450
showardd3dc1992009-04-22 21:01:40 +00002451 def abort(self, dispatcher):
2452 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002453
showardd3dc1992009-04-22 21:01:40 +00002454 Status = models.HostQueueEntry.Status
2455 has_running_job_agent = (
2456 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2457 and dispatcher.get_agents_for_entry(self))
2458 if has_running_job_agent:
2459 # do nothing; post-job tasks will finish and then mark this entry
2460 # with status "Aborted" and take care of the host
2461 return
2462
2463 if self.status in (Status.STARTING, Status.PENDING):
2464 self.host.set_status(models.Host.Status.READY)
2465 elif self.status == Status.VERIFYING:
2466 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2467
2468 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002469
2470 def execution_tag(self):
2471 assert self.execution_subdir
2472 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002473
2474
mbligh36768f02008-02-22 18:28:33 +00002475class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002476 _table_name = 'jobs'
2477 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2478 'control_type', 'created_on', 'synch_count', 'timeout',
2479 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2480
2481
showarda3c58572009-03-12 20:36:59 +00002482 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002483 assert id or row
showarda3c58572009-03-12 20:36:59 +00002484 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002485
mblighe2586682008-02-29 22:45:46 +00002486
jadmanski0afbb632008-06-06 21:10:57 +00002487 def is_server_job(self):
2488 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002489
2490
showard170873e2009-01-07 00:22:26 +00002491 def tag(self):
2492 return "%s-%s" % (self.id, self.owner)
2493
2494
jadmanski0afbb632008-06-06 21:10:57 +00002495 def get_host_queue_entries(self):
2496 rows = _db.execute("""
2497 SELECT * FROM host_queue_entries
2498 WHERE job_id= %s
2499 """, (self.id,))
2500 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002501
jadmanski0afbb632008-06-06 21:10:57 +00002502 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002503
jadmanski0afbb632008-06-06 21:10:57 +00002504 return entries
mbligh36768f02008-02-22 18:28:33 +00002505
2506
jadmanski0afbb632008-06-06 21:10:57 +00002507 def set_status(self, status, update_queues=False):
2508 self.update_field('status',status)
2509
2510 if update_queues:
2511 for queue_entry in self.get_host_queue_entries():
2512 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002513
2514
jadmanski0afbb632008-06-06 21:10:57 +00002515 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002516 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2517 status='Pending')
2518 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002519
2520
jadmanski0afbb632008-06-06 21:10:57 +00002521 def num_machines(self, clause = None):
2522 sql = "job_id=%s" % self.id
2523 if clause:
2524 sql += " AND (%s)" % clause
2525 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002526
2527
jadmanski0afbb632008-06-06 21:10:57 +00002528 def num_queued(self):
2529 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002530
2531
jadmanski0afbb632008-06-06 21:10:57 +00002532 def num_active(self):
2533 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002534
2535
jadmanski0afbb632008-06-06 21:10:57 +00002536 def num_complete(self):
2537 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002538
2539
jadmanski0afbb632008-06-06 21:10:57 +00002540 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002541 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002542
mbligh36768f02008-02-22 18:28:33 +00002543
showard6bb7c292009-01-30 01:44:51 +00002544 def _not_yet_run_entries(self, include_verifying=True):
2545 statuses = [models.HostQueueEntry.Status.QUEUED,
2546 models.HostQueueEntry.Status.PENDING]
2547 if include_verifying:
2548 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2549 return models.HostQueueEntry.objects.filter(job=self.id,
2550 status__in=statuses)
2551
2552
2553 def _stop_all_entries(self):
2554 entries_to_stop = self._not_yet_run_entries(
2555 include_verifying=False)
2556 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002557 assert not child_entry.complete, (
2558 '%s status=%s, active=%s, complete=%s' %
2559 (child_entry.id, child_entry.status, child_entry.active,
2560 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002561 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2562 child_entry.host.status = models.Host.Status.READY
2563 child_entry.host.save()
2564 child_entry.status = models.HostQueueEntry.Status.STOPPED
2565 child_entry.save()
2566
showard2bab8f42008-11-12 18:15:22 +00002567 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002568 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002569 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002570 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002571
2572
jadmanski0afbb632008-06-06 21:10:57 +00002573 def write_to_machines_file(self, queue_entry):
2574 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002575 file_path = os.path.join(self.tag(), '.machines')
2576 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002577
2578
showard2bab8f42008-11-12 18:15:22 +00002579 def _next_group_name(self):
2580 query = models.HostQueueEntry.objects.filter(
2581 job=self.id).values('execution_subdir').distinct()
2582 subdirs = (entry['execution_subdir'] for entry in query)
2583 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2584 ids = [int(match.group(1)) for match in groups if match]
2585 if ids:
2586 next_id = max(ids) + 1
2587 else:
2588 next_id = 0
2589 return "group%d" % next_id
2590
2591
showard170873e2009-01-07 00:22:26 +00002592 def _write_control_file(self, execution_tag):
2593 control_path = _drone_manager.attach_file_to_execution(
2594 execution_tag, self.control_file)
2595 return control_path
mbligh36768f02008-02-22 18:28:33 +00002596
showardb2e2c322008-10-14 17:33:55 +00002597
showard2bab8f42008-11-12 18:15:22 +00002598 def get_group_entries(self, queue_entry_from_group):
2599 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002600 return list(HostQueueEntry.fetch(
2601 where='job_id=%s AND execution_subdir=%s',
2602 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002603
2604
showardb2e2c322008-10-14 17:33:55 +00002605 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002606 assert queue_entries
2607 execution_tag = queue_entries[0].execution_tag()
2608 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002609 hostnames = ','.join([entry.get_host().hostname
2610 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002611
showard87ba02a2009-04-20 19:37:32 +00002612 params = _autoserv_command_line(
2613 hostnames, execution_tag,
2614 ['-P', execution_tag, '-n',
2615 _drone_manager.absolute_path(control_path)],
2616 job=self)
mbligh36768f02008-02-22 18:28:33 +00002617
jadmanski0afbb632008-06-06 21:10:57 +00002618 if not self.is_server_job():
2619 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002620
showardb2e2c322008-10-14 17:33:55 +00002621 return params
mblighe2586682008-02-29 22:45:46 +00002622
mbligh36768f02008-02-22 18:28:33 +00002623
showardc9ae1782009-01-30 01:42:37 +00002624 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002625 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002626 return True
showard0fc38302008-10-23 00:44:07 +00002627 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002628 return queue_entry.get_host().dirty
2629 return False
showard21baa452008-10-21 00:08:39 +00002630
showardc9ae1782009-01-30 01:42:37 +00002631
2632 def _should_run_verify(self, queue_entry):
2633 do_not_verify = (queue_entry.host.protection ==
2634 host_protections.Protection.DO_NOT_VERIFY)
2635 if do_not_verify:
2636 return False
2637 return self.run_verify
2638
2639
2640 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002641 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002642 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002643 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002644 if self._should_run_verify(queue_entry):
2645 tasks.append(VerifyTask(queue_entry=queue_entry))
2646 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002647 return tasks
2648
2649
showard2bab8f42008-11-12 18:15:22 +00002650 def _assign_new_group(self, queue_entries):
2651 if len(queue_entries) == 1:
2652 group_name = queue_entries[0].get_host().hostname
2653 else:
2654 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002655 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002656 self.id, [entry.host.hostname for entry in queue_entries],
2657 group_name)
2658
2659 for queue_entry in queue_entries:
2660 queue_entry.set_execution_subdir(group_name)
2661
2662
2663 def _choose_group_to_run(self, include_queue_entry):
2664 chosen_entries = [include_queue_entry]
2665
2666 num_entries_needed = self.synch_count - 1
2667 if num_entries_needed > 0:
2668 pending_entries = HostQueueEntry.fetch(
2669 where='job_id = %s AND status = "Pending" AND id != %s',
2670 params=(self.id, include_queue_entry.id))
2671 chosen_entries += list(pending_entries)[:num_entries_needed]
2672
2673 self._assign_new_group(chosen_entries)
2674 return chosen_entries
2675
2676
2677 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002678 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002679 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2680 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002681
showard2bab8f42008-11-12 18:15:22 +00002682 queue_entries = self._choose_group_to_run(queue_entry)
2683 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002684
2685
2686 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002687 for queue_entry in queue_entries:
2688 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002689 params = self._get_autoserv_params(queue_entries)
2690 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2691 cmd=params)
2692 tasks = initial_tasks + [queue_task]
2693 entry_ids = [entry.id for entry in queue_entries]
2694
showard170873e2009-01-07 00:22:26 +00002695 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002696
2697
mbligh36768f02008-02-22 18:28:33 +00002698if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002699 main()