blob: d2ff7a9ec77490a1fcaef58dcd08dfd3fb7106e5 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000017from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000085 except SystemExit:
86 raise
showard27f33872009-04-07 18:20:53 +000087 except:
88 logging.exception('Exception escaping in monitor_db')
89 raise
90
91
92def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000093 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000094
jadmanski0afbb632008-06-06 21:10:57 +000095 parser = optparse.OptionParser(usage)
96 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
97 action='store_true')
98 parser.add_option('--logfile', help='Set a log file that all stdout ' +
99 'should be redirected to. Stderr will go to this ' +
100 'file + ".err"')
101 parser.add_option('--test', help='Indicate that scheduler is under ' +
102 'test and should use dummy autoserv and no parsing',
103 action='store_true')
104 (options, args) = parser.parse_args()
105 if len(args) != 1:
106 parser.print_usage()
107 return
mbligh36768f02008-02-22 18:28:33 +0000108
jadmanski0afbb632008-06-06 21:10:57 +0000109 global RESULTS_DIR
110 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000111
mbligh83c1e9e2009-05-01 23:10:41 +0000112 site_init = utils.import_site_function(__file__,
113 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
114 _site_init_monitor_db_dummy)
115 site_init()
116
showardcca334f2009-03-12 20:38:34 +0000117 # Change the cwd while running to avoid issues incase we were launched from
118 # somewhere odd (such as a random NFS home directory of the person running
119 # sudo to launch us as the appropriate user).
120 os.chdir(RESULTS_DIR)
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000123 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
124 "notify_email_statuses",
125 default='')
showardc85c21b2008-11-24 22:17:37 +0000126 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000127 _notify_email_statuses = [status for status in
128 re.split(r'[\s,;:]', notify_statuses_list.lower())
129 if status]
showardc85c21b2008-11-24 22:17:37 +0000130
jadmanski0afbb632008-06-06 21:10:57 +0000131 if options.test:
132 global _autoserv_path
133 _autoserv_path = 'autoserv_dummy'
134 global _testing_mode
135 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000136
mbligh37eceaa2008-12-15 22:56:37 +0000137 # AUTOTEST_WEB.base_url is still a supported config option as some people
138 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000139 global _base_url
showard170873e2009-01-07 00:22:26 +0000140 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
141 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000142 if config_base_url:
143 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000144 else:
mbligh37eceaa2008-12-15 22:56:37 +0000145 # For the common case of everything running on a single server you
146 # can just set the hostname in a single place in the config file.
147 server_name = c.get_config_value('SERVER', 'hostname')
148 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000149 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000150 sys.exit(1)
151 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000152
showardc5afc462009-01-13 00:09:39 +0000153 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
showardc5afc462009-01-13 00:09:39 +0000157 init(options.logfile)
158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
jadmanski0afbb632008-06-06 21:10:57 +0000161 while not _shutdown:
162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
180def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000181 if logfile:
182 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
184 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000185
mblighfb676032009-04-01 18:25:38 +0000186 utils.write_pid("monitor_db")
187
showardb1e51872008-10-07 11:08:18 +0000188 if _testing_mode:
189 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000190 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
193 global _db
showard170873e2009-01-07 00:22:26 +0000194 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000195 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000196
showardfa8629c2008-11-04 16:51:23 +0000197 # ensure Django connection is in autocommit
198 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000199 # bypass the readonly connection
200 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000201
showardb18134f2009-03-20 20:52:18 +0000202 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000203 signal.signal(signal.SIGINT, handle_sigint)
204
showardd1ee1dd2009-01-07 21:33:08 +0000205 drones = global_config.global_config.get_config_value(
206 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
207 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000208 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000209 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000210 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000213
214
215def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000216 out_file = logfile
217 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000218 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000219 out_fd = open(out_file, "a", buffering=0)
220 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000221
jadmanski0afbb632008-06-06 21:10:57 +0000222 os.dup2(out_fd.fileno(), sys.stdout.fileno())
223 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000224
jadmanski0afbb632008-06-06 21:10:57 +0000225 sys.stdout = out_fd
226 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000227
228
showard87ba02a2009-04-20 19:37:32 +0000229def _autoserv_command_line(machines, results_dir, extra_args, job=None,
230 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000231 """
232 @returns The autoserv command line as a list of executable + parameters.
233
234 @param machines - string - A machine or comma separated list of machines
235 for the (-m) flag.
236 @param results_dir - string - Where the results will be written (-r).
237 @param extra_args - list - Additional arguments to pass to autoserv.
238 @param job - Job object - If supplied, -u owner and -l name parameters
239 will be added.
240 @param queue_entry - A HostQueueEntry object - If supplied and no Job
241 object was supplied, this will be used to lookup the Job object.
242 """
showard87ba02a2009-04-20 19:37:32 +0000243 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
244 '-r', _drone_manager.absolute_path(results_dir)]
245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
249 return autoserv_argv + extra_args
250
251
showard89f84db2009-03-12 20:39:13 +0000252class SchedulerError(Exception):
253 """Raised by HostScheduler when an inconsistent state occurs."""
254
255
showard63a34772008-08-18 19:32:50 +0000256class HostScheduler(object):
257 def _get_ready_hosts(self):
258 # avoid any host with a currently active queue entry against it
259 hosts = Host.fetch(
260 joins='LEFT JOIN host_queue_entries AS active_hqe '
261 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000262 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000263 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000264 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000265 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
266 return dict((host.id, host) for host in hosts)
267
268
269 @staticmethod
270 def _get_sql_id_list(id_list):
271 return ','.join(str(item_id) for item_id in id_list)
272
273
274 @classmethod
showard989f25d2008-10-01 11:38:11 +0000275 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000276 if not id_list:
277 return {}
showard63a34772008-08-18 19:32:50 +0000278 query %= cls._get_sql_id_list(id_list)
279 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000280 return cls._process_many2many_dict(rows, flip)
281
282
283 @staticmethod
284 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000285 result = {}
286 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000287 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000288 if flip:
289 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000290 result.setdefault(left_id, set()).add(right_id)
291 return result
292
293
294 @classmethod
295 def _get_job_acl_groups(cls, job_ids):
296 query = """
showardd9ac4452009-02-07 02:04:37 +0000297 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000298 FROM jobs
299 INNER JOIN users ON users.login = jobs.owner
300 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
301 WHERE jobs.id IN (%s)
302 """
303 return cls._get_many2many_dict(query, job_ids)
304
305
306 @classmethod
307 def _get_job_ineligible_hosts(cls, job_ids):
308 query = """
309 SELECT job_id, host_id
310 FROM ineligible_host_queues
311 WHERE job_id IN (%s)
312 """
313 return cls._get_many2many_dict(query, job_ids)
314
315
316 @classmethod
showard989f25d2008-10-01 11:38:11 +0000317 def _get_job_dependencies(cls, job_ids):
318 query = """
319 SELECT job_id, label_id
320 FROM jobs_dependency_labels
321 WHERE job_id IN (%s)
322 """
323 return cls._get_many2many_dict(query, job_ids)
324
325
326 @classmethod
showard63a34772008-08-18 19:32:50 +0000327 def _get_host_acls(cls, host_ids):
328 query = """
showardd9ac4452009-02-07 02:04:37 +0000329 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000330 FROM acl_groups_hosts
331 WHERE host_id IN (%s)
332 """
333 return cls._get_many2many_dict(query, host_ids)
334
335
336 @classmethod
337 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000338 if not host_ids:
339 return {}, {}
showard63a34772008-08-18 19:32:50 +0000340 query = """
341 SELECT label_id, host_id
342 FROM hosts_labels
343 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000344 """ % cls._get_sql_id_list(host_ids)
345 rows = _db.execute(query)
346 labels_to_hosts = cls._process_many2many_dict(rows)
347 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
348 return labels_to_hosts, hosts_to_labels
349
350
351 @classmethod
352 def _get_labels(cls):
353 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000354
355
356 def refresh(self, pending_queue_entries):
357 self._hosts_available = self._get_ready_hosts()
358
359 relevant_jobs = [queue_entry.job_id
360 for queue_entry in pending_queue_entries]
361 self._job_acls = self._get_job_acl_groups(relevant_jobs)
362 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000363 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000364
365 host_ids = self._hosts_available.keys()
366 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000367 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
368
369 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000370
371
372 def _is_acl_accessible(self, host_id, queue_entry):
373 job_acls = self._job_acls.get(queue_entry.job_id, set())
374 host_acls = self._host_acls.get(host_id, set())
375 return len(host_acls.intersection(job_acls)) > 0
376
377
showard989f25d2008-10-01 11:38:11 +0000378 def _check_job_dependencies(self, job_dependencies, host_labels):
379 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000380 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000381
382
383 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
384 queue_entry):
showardade14e22009-01-26 22:38:32 +0000385 if not queue_entry.meta_host:
386 # bypass only_if_needed labels when a specific host is selected
387 return True
388
showard989f25d2008-10-01 11:38:11 +0000389 for label_id in host_labels:
390 label = self._labels[label_id]
391 if not label.only_if_needed:
392 # we don't care about non-only_if_needed labels
393 continue
394 if queue_entry.meta_host == label_id:
395 # if the label was requested in a metahost it's OK
396 continue
397 if label_id not in job_dependencies:
398 return False
399 return True
400
401
showard89f84db2009-03-12 20:39:13 +0000402 def _check_atomic_group_labels(self, host_labels, queue_entry):
403 """
404 Determine if the given HostQueueEntry's atomic group settings are okay
405 to schedule on a host with the given labels.
406
407 @param host_labels - A list of label ids that the host has.
408 @param queue_entry - The HostQueueEntry being considered for the host.
409
410 @returns True if atomic group settings are okay, False otherwise.
411 """
412 return (self._get_host_atomic_group_id(host_labels) ==
413 queue_entry.atomic_group_id)
414
415
416 def _get_host_atomic_group_id(self, host_labels):
417 """
418 Return the atomic group label id for a host with the given set of
419 labels if any, or None otherwise. Raises an exception if more than
420 one atomic group are found in the set of labels.
421
422 @param host_labels - A list of label ids that the host has.
423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
426 @raises SchedulerError - If more than one atomic group label is found.
427 """
428 atomic_ids = [self._labels[label_id].atomic_group_id
429 for label_id in host_labels
430 if self._labels[label_id].atomic_group_id is not None]
431 if not atomic_ids:
432 return None
433 if len(atomic_ids) > 1:
434 raise SchedulerError('More than one atomic label on host.')
435 return atomic_ids[0]
436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
467 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
468 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000469
showard89f84db2009-03-12 20:39:13 +0000470 return (self._is_acl_accessible(host_id, queue_entry) and
471 self._check_job_dependencies(job_dependencies, host_labels) and
472 self._check_only_if_needed_labels(
473 job_dependencies, host_labels, queue_entry) and
474 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
548 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000693 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000694 self._reverify_remaining_hosts()
695 # reinitialize drones after killing orphaned processes, since they can
696 # leave around files when they die
697 _drone_manager.execute_actions()
698 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000699
showard170873e2009-01-07 00:22:26 +0000700
701 def _register_pidfiles(self):
702 # during recovery we may need to read pidfiles for both running and
703 # parsing entries
704 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000705 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000706 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000707 for pidfile_name in _ALL_PIDFILE_NAMES:
708 pidfile_id = _drone_manager.get_pidfile_id_from(
709 queue_entry.execution_tag(), pidfile_name=pidfile_name)
710 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000711
712
showardd3dc1992009-04-22 21:01:40 +0000713 def _recover_entries_with_status(self, status, orphans, pidfile_name,
714 recover_entries_fn):
715 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000716 for queue_entry in queue_entries:
717 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000718 # synchronous job we've already recovered
719 continue
showardd3dc1992009-04-22 21:01:40 +0000720 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000721 execution_tag = queue_entry.execution_tag()
722 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000723 run_monitor.attach_to_existing_process(execution_tag,
724 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000725
726 log_message = ('Recovering %s entry %s ' %
727 (status.lower(),
728 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000729 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000730 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000731 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000732 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000733 continue
mbligh90a549d2008-03-25 23:52:34 +0000734
showard597bfd32009-05-08 18:22:50 +0000735 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000736 run_monitor.get_process())
737 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
738 orphans.discard(run_monitor.get_process())
739
740
741 def _kill_remaining_orphan_processes(self, orphans):
742 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000743 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000744 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000745
showard170873e2009-01-07 00:22:26 +0000746
showardd3dc1992009-04-22 21:01:40 +0000747 def _recover_running_entries(self, orphans):
748 def recover_entries(job, queue_entries, run_monitor):
749 if run_monitor is not None:
750 queue_task = RecoveryQueueTask(job=job,
751 queue_entries=queue_entries,
752 run_monitor=run_monitor)
753 self.add_agent(Agent(tasks=[queue_task],
754 num_processes=len(queue_entries)))
755 # else, _requeue_other_active_entries will cover this
756
757 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
758 orphans, '.autoserv_execute',
759 recover_entries)
760
761
762 def _recover_gathering_entries(self, orphans):
763 def recover_entries(job, queue_entries, run_monitor):
764 gather_task = GatherLogsTask(job, queue_entries,
765 run_monitor=run_monitor)
766 self.add_agent(Agent([gather_task]))
767
768 self._recover_entries_with_status(
769 models.HostQueueEntry.Status.GATHERING,
770 orphans, _CRASHINFO_PID_FILE, recover_entries)
771
772
773 def _recover_parsing_entries(self, orphans):
774 def recover_entries(job, queue_entries, run_monitor):
775 reparse_task = FinalReparseTask(queue_entries,
776 run_monitor=run_monitor)
777 self.add_agent(Agent([reparse_task], num_processes=0))
778
779 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
780 orphans, _PARSER_PID_FILE,
781 recover_entries)
782
783
784 def _recover_all_recoverable_entries(self):
785 orphans = _drone_manager.get_orphaned_autoserv_processes()
786 self._recover_running_entries(orphans)
787 self._recover_gathering_entries(orphans)
788 self._recover_parsing_entries(orphans)
789 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000790
showard97aed502008-11-04 02:01:24 +0000791
showard170873e2009-01-07 00:22:26 +0000792 def _requeue_other_active_entries(self):
793 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000794 where='active AND NOT complete AND '
795 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000796 for queue_entry in queue_entries:
797 if self.get_agents_for_entry(queue_entry):
798 # entry has already been recovered
799 continue
showardd3dc1992009-04-22 21:01:40 +0000800 if queue_entry.aborted:
801 queue_entry.abort(self)
802 continue
803
804 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000805 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000806 if queue_entry.host:
807 tasks = queue_entry.host.reverify_tasks()
808 self.add_agent(Agent(tasks))
809 agent = queue_entry.requeue()
810
811
showard1ff7b2e2009-05-15 23:17:18 +0000812 def _find_reverify(self):
showarda64e52a2009-06-08 23:24:08 +0000813 self._reverify_hosts_where("status = 'Reverify'", cleanup=False)
showard1ff7b2e2009-05-15 23:17:18 +0000814
815
showard170873e2009-01-07 00:22:26 +0000816 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000817 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000818 self._reverify_hosts_where("""(status = 'Repairing' OR
819 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000820 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000821
showard170873e2009-01-07 00:22:26 +0000822 # recover "Running" hosts with no active queue entries, although this
823 # should never happen
824 message = ('Recovering running host %s - this probably indicates a '
825 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000826 self._reverify_hosts_where("""status = 'Running' AND
827 id NOT IN (SELECT host_id
828 FROM host_queue_entries
829 WHERE active)""",
830 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000831
832
jadmanski0afbb632008-06-06 21:10:57 +0000833 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000834 print_message='Reverifying host %s',
835 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000836 full_where='locked = 0 AND invalid = 0 AND ' + where
837 for host in Host.fetch(where=full_where):
838 if self.host_has_agent(host):
839 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000840 continue
showard170873e2009-01-07 00:22:26 +0000841 if print_message:
showardb18134f2009-03-20 20:52:18 +0000842 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000843 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000844 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000845
846
jadmanski0afbb632008-06-06 21:10:57 +0000847 def _recover_hosts(self):
848 # recover "Repair Failed" hosts
849 message = 'Reverifying dead host %s'
850 self._reverify_hosts_where("status = 'Repair Failed'",
851 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000852
853
showard04c82c52008-05-29 19:38:12 +0000854
showardb95b1bd2008-08-15 18:11:04 +0000855 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000856 # prioritize by job priority, then non-metahost over metahost, then FIFO
857 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000858 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000859 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000860 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000861
862
showard89f84db2009-03-12 20:39:13 +0000863 def _refresh_pending_queue_entries(self):
864 """
865 Lookup the pending HostQueueEntries and call our HostScheduler
866 refresh() method given that list. Return the list.
867
868 @returns A list of pending HostQueueEntries sorted in priority order.
869 """
showard63a34772008-08-18 19:32:50 +0000870 queue_entries = self._get_pending_queue_entries()
871 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000872 return []
showardb95b1bd2008-08-15 18:11:04 +0000873
showard63a34772008-08-18 19:32:50 +0000874 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000875
showard89f84db2009-03-12 20:39:13 +0000876 return queue_entries
877
878
879 def _schedule_atomic_group(self, queue_entry):
880 """
881 Schedule the given queue_entry on an atomic group of hosts.
882
883 Returns immediately if there are insufficient available hosts.
884
885 Creates new HostQueueEntries based off of queue_entry for the
886 scheduled hosts and starts them all running.
887 """
888 # This is a virtual host queue entry representing an entire
889 # atomic group, find a group and schedule their hosts.
890 group_hosts = self._host_scheduler.find_eligible_atomic_group(
891 queue_entry)
892 if not group_hosts:
893 return
894 # The first assigned host uses the original HostQueueEntry
895 group_queue_entries = [queue_entry]
896 for assigned_host in group_hosts[1:]:
897 # Create a new HQE for every additional assigned_host.
898 new_hqe = HostQueueEntry.clone(queue_entry)
899 new_hqe.save()
900 group_queue_entries.append(new_hqe)
901 assert len(group_queue_entries) == len(group_hosts)
902 for queue_entry, host in itertools.izip(group_queue_entries,
903 group_hosts):
904 self._run_queue_entry(queue_entry, host)
905
906
907 def _schedule_new_jobs(self):
908 queue_entries = self._refresh_pending_queue_entries()
909 if not queue_entries:
910 return
911
showard63a34772008-08-18 19:32:50 +0000912 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000913 if (queue_entry.atomic_group_id is None or
914 queue_entry.host_id is not None):
915 assigned_host = self._host_scheduler.find_eligible_host(
916 queue_entry)
917 if assigned_host:
918 self._run_queue_entry(queue_entry, assigned_host)
919 else:
920 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000921
922
923 def _run_queue_entry(self, queue_entry, host):
924 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000925 # in some cases (synchronous jobs with run_verify=False), agent may be
926 # None
showard9976ce92008-10-15 20:28:13 +0000927 if agent:
928 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000929
930
jadmanski0afbb632008-06-06 21:10:57 +0000931 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000932 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
933 for agent in self.get_agents_for_entry(entry):
934 agent.abort()
935 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000936
937
showard324bf812009-01-20 23:23:38 +0000938 def _can_start_agent(self, agent, num_started_this_cycle,
939 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000940 # always allow zero-process agents to run
941 if agent.num_processes == 0:
942 return True
943 # don't allow any nonzero-process agents to run after we've reached a
944 # limit (this avoids starvation of many-process agents)
945 if have_reached_limit:
946 return False
947 # total process throttling
showard324bf812009-01-20 23:23:38 +0000948 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000949 return False
950 # if a single agent exceeds the per-cycle throttling, still allow it to
951 # run when it's the first agent in the cycle
952 if num_started_this_cycle == 0:
953 return True
954 # per-cycle throttling
955 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000956 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000957 return False
958 return True
959
960
jadmanski0afbb632008-06-06 21:10:57 +0000961 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000962 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000963 have_reached_limit = False
964 # iterate over copy, so we can remove agents during iteration
965 for agent in list(self._agents):
966 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000967 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000968 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000969 continue
970 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000971 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000972 have_reached_limit):
973 have_reached_limit = True
974 continue
showard4c5374f2008-09-04 17:02:56 +0000975 num_started_this_cycle += agent.num_processes
976 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000977 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000978 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000979
980
showard29f7cd22009-04-29 21:16:24 +0000981 def _process_recurring_runs(self):
982 recurring_runs = models.RecurringRun.objects.filter(
983 start_date__lte=datetime.datetime.now())
984 for rrun in recurring_runs:
985 # Create job from template
986 job = rrun.job
987 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000988 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000989
990 host_objects = info['hosts']
991 one_time_hosts = info['one_time_hosts']
992 metahost_objects = info['meta_hosts']
993 dependencies = info['dependencies']
994 atomic_group = info['atomic_group']
995
996 for host in one_time_hosts or []:
997 this_host = models.Host.create_one_time_host(host.hostname)
998 host_objects.append(this_host)
999
1000 try:
1001 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001002 options=options,
showard29f7cd22009-04-29 21:16:24 +00001003 host_objects=host_objects,
1004 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001005 atomic_group=atomic_group)
1006
1007 except Exception, ex:
1008 logging.exception(ex)
1009 #TODO send email
1010
1011 if rrun.loop_count == 1:
1012 rrun.delete()
1013 else:
1014 if rrun.loop_count != 0: # if not infinite loop
1015 # calculate new start_date
1016 difference = datetime.timedelta(seconds=rrun.loop_period)
1017 rrun.start_date = rrun.start_date + difference
1018 rrun.loop_count -= 1
1019 rrun.save()
1020
1021
showard170873e2009-01-07 00:22:26 +00001022class PidfileRunMonitor(object):
1023 """
1024 Client must call either run() to start a new process or
1025 attach_to_existing_process().
1026 """
mbligh36768f02008-02-22 18:28:33 +00001027
showard170873e2009-01-07 00:22:26 +00001028 class _PidfileException(Exception):
1029 """
1030 Raised when there's some unexpected behavior with the pid file, but only
1031 used internally (never allowed to escape this class).
1032 """
mbligh36768f02008-02-22 18:28:33 +00001033
1034
showard170873e2009-01-07 00:22:26 +00001035 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001036 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001037 self._start_time = None
1038 self.pidfile_id = None
1039 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001040
1041
showard170873e2009-01-07 00:22:26 +00001042 def _add_nice_command(self, command, nice_level):
1043 if not nice_level:
1044 return command
1045 return ['nice', '-n', str(nice_level)] + command
1046
1047
1048 def _set_start_time(self):
1049 self._start_time = time.time()
1050
1051
1052 def run(self, command, working_directory, nice_level=None, log_file=None,
1053 pidfile_name=None, paired_with_pidfile=None):
1054 assert command is not None
1055 if nice_level is not None:
1056 command = ['nice', '-n', str(nice_level)] + command
1057 self._set_start_time()
1058 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001059 command, working_directory, pidfile_name=pidfile_name,
1060 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001061
1062
showardd3dc1992009-04-22 21:01:40 +00001063 def attach_to_existing_process(self, execution_tag,
1064 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001065 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001066 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1067 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001068 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001069
1070
jadmanski0afbb632008-06-06 21:10:57 +00001071 def kill(self):
showard170873e2009-01-07 00:22:26 +00001072 if self.has_process():
1073 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001074
mbligh36768f02008-02-22 18:28:33 +00001075
showard170873e2009-01-07 00:22:26 +00001076 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001077 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001078 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001079
1080
showard170873e2009-01-07 00:22:26 +00001081 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001082 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001083 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001084 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001085
1086
showard170873e2009-01-07 00:22:26 +00001087 def _read_pidfile(self, use_second_read=False):
1088 assert self.pidfile_id is not None, (
1089 'You must call run() or attach_to_existing_process()')
1090 contents = _drone_manager.get_pidfile_contents(
1091 self.pidfile_id, use_second_read=use_second_read)
1092 if contents.is_invalid():
1093 self._state = drone_manager.PidfileContents()
1094 raise self._PidfileException(contents)
1095 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001096
1097
showard21baa452008-10-21 00:08:39 +00001098 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001099 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1100 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001101 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001102 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001103 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001104
1105
1106 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001107 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001108 return
mblighbb421852008-03-11 22:36:16 +00001109
showard21baa452008-10-21 00:08:39 +00001110 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001111
showard170873e2009-01-07 00:22:26 +00001112 if self._state.process is None:
1113 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001114 return
mbligh90a549d2008-03-25 23:52:34 +00001115
showard21baa452008-10-21 00:08:39 +00001116 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001117 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001118 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001119 return
mbligh90a549d2008-03-25 23:52:34 +00001120
showard170873e2009-01-07 00:22:26 +00001121 # pid but no running process - maybe process *just* exited
1122 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001123 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001124 # autoserv exited without writing an exit code
1125 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001126 self._handle_pidfile_error(
1127 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001128
showard21baa452008-10-21 00:08:39 +00001129
1130 def _get_pidfile_info(self):
1131 """\
1132 After completion, self._state will contain:
1133 pid=None, exit_status=None if autoserv has not yet run
1134 pid!=None, exit_status=None if autoserv is running
1135 pid!=None, exit_status!=None if autoserv has completed
1136 """
1137 try:
1138 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001139 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001140 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001141
1142
showard170873e2009-01-07 00:22:26 +00001143 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001144 """\
1145 Called when no pidfile is found or no pid is in the pidfile.
1146 """
showard170873e2009-01-07 00:22:26 +00001147 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001148 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001149 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1150 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001151 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001152 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001153
1154
showard35162b02009-03-03 02:17:30 +00001155 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001156 """\
1157 Called when autoserv has exited without writing an exit status,
1158 or we've timed out waiting for autoserv to write a pid to the
1159 pidfile. In either case, we just return failure and the caller
1160 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001161
showard170873e2009-01-07 00:22:26 +00001162 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001163 """
1164 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001165 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001166 self._state.exit_status = 1
1167 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001168
1169
jadmanski0afbb632008-06-06 21:10:57 +00001170 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001171 self._get_pidfile_info()
1172 return self._state.exit_status
1173
1174
1175 def num_tests_failed(self):
1176 self._get_pidfile_info()
1177 assert self._state.num_tests_failed is not None
1178 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001179
1180
mbligh36768f02008-02-22 18:28:33 +00001181class Agent(object):
showard170873e2009-01-07 00:22:26 +00001182 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001183 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001184 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001185 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001186 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001187
showard170873e2009-01-07 00:22:26 +00001188 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1189 for task in tasks)
1190 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1191
showardd3dc1992009-04-22 21:01:40 +00001192 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001193 for task in tasks:
1194 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001195
1196
showardd3dc1992009-04-22 21:01:40 +00001197 def _clear_queue(self):
1198 self.queue = Queue.Queue(0)
1199
1200
showard170873e2009-01-07 00:22:26 +00001201 def _union_ids(self, id_lists):
1202 return set(itertools.chain(*id_lists))
1203
1204
jadmanski0afbb632008-06-06 21:10:57 +00001205 def add_task(self, task):
1206 self.queue.put_nowait(task)
1207 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001208
1209
jadmanski0afbb632008-06-06 21:10:57 +00001210 def tick(self):
showard21baa452008-10-21 00:08:39 +00001211 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001212 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001213 self.active_task.poll()
1214 if not self.active_task.is_done():
1215 return
1216 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001217
1218
jadmanski0afbb632008-06-06 21:10:57 +00001219 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001220 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001221 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001222 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001223 if not self.active_task.success:
1224 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001225 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001226
jadmanski0afbb632008-06-06 21:10:57 +00001227 if not self.is_done():
1228 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001229
1230
jadmanski0afbb632008-06-06 21:10:57 +00001231 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001232 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001233 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1234 # get reset.
1235 new_agent = Agent(self.active_task.failure_tasks)
1236 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001237
mblighe2586682008-02-29 22:45:46 +00001238
showard4c5374f2008-09-04 17:02:56 +00001239 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001240 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001241
1242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001244 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001245
1246
showardd3dc1992009-04-22 21:01:40 +00001247 def abort(self):
showard08a36412009-05-05 01:01:13 +00001248 # abort tasks until the queue is empty or a task ignores the abort
1249 while not self.is_done():
1250 if not self.active_task:
1251 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001252 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001253 if not self.active_task.aborted:
1254 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001255 return
1256 self.active_task = None
1257
showardd3dc1992009-04-22 21:01:40 +00001258
mbligh36768f02008-02-22 18:28:33 +00001259class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001260 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1261 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001262 self.done = False
1263 self.failure_tasks = failure_tasks
1264 self.started = False
1265 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001266 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001267 self.task = None
1268 self.agent = None
1269 self.monitor = None
1270 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001271 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001272 self.queue_entry_ids = []
1273 self.host_ids = []
1274 self.log_file = None
1275
1276
1277 def _set_ids(self, host=None, queue_entries=None):
1278 if queue_entries and queue_entries != [None]:
1279 self.host_ids = [entry.host.id for entry in queue_entries]
1280 self.queue_entry_ids = [entry.id for entry in queue_entries]
1281 else:
1282 assert host
1283 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001284
1285
jadmanski0afbb632008-06-06 21:10:57 +00001286 def poll(self):
showard08a36412009-05-05 01:01:13 +00001287 if not self.started:
1288 self.start()
1289 self.tick()
1290
1291
1292 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001293 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001294 exit_code = self.monitor.exit_code()
1295 if exit_code is None:
1296 return
1297 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001298 else:
1299 success = False
mbligh36768f02008-02-22 18:28:33 +00001300
jadmanski0afbb632008-06-06 21:10:57 +00001301 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001302
1303
jadmanski0afbb632008-06-06 21:10:57 +00001304 def is_done(self):
1305 return self.done
mbligh36768f02008-02-22 18:28:33 +00001306
1307
jadmanski0afbb632008-06-06 21:10:57 +00001308 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001309 if self.done:
1310 return
jadmanski0afbb632008-06-06 21:10:57 +00001311 self.done = True
1312 self.success = success
1313 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001314
1315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def prolog(self):
1317 pass
mblighd64e5702008-04-04 21:39:28 +00001318
1319
jadmanski0afbb632008-06-06 21:10:57 +00001320 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001321 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001322
mbligh36768f02008-02-22 18:28:33 +00001323
jadmanski0afbb632008-06-06 21:10:57 +00001324 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001325 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001326 _drone_manager.copy_to_results_repository(
1327 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001328
1329
jadmanski0afbb632008-06-06 21:10:57 +00001330 def epilog(self):
1331 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001332
1333
jadmanski0afbb632008-06-06 21:10:57 +00001334 def start(self):
1335 assert self.agent
1336
1337 if not self.started:
1338 self.prolog()
1339 self.run()
1340
1341 self.started = True
1342
1343
1344 def abort(self):
1345 if self.monitor:
1346 self.monitor.kill()
1347 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001348 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001349 self.cleanup()
1350
1351
showard170873e2009-01-07 00:22:26 +00001352 def set_host_log_file(self, base_name, host):
1353 filename = '%s.%s' % (time.time(), base_name)
1354 self.log_file = os.path.join('hosts', host.hostname, filename)
1355
1356
showardde634ee2009-01-30 01:44:24 +00001357 def _get_consistent_execution_tag(self, queue_entries):
1358 first_execution_tag = queue_entries[0].execution_tag()
1359 for queue_entry in queue_entries[1:]:
1360 assert queue_entry.execution_tag() == first_execution_tag, (
1361 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1362 queue_entry,
1363 first_execution_tag,
1364 queue_entries[0]))
1365 return first_execution_tag
1366
1367
showarda1e74b32009-05-12 17:32:04 +00001368 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001369 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001370 if use_monitor is None:
1371 assert self.monitor
1372 use_monitor = self.monitor
1373 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001374 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001375 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001376 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001377 results_path)
showardde634ee2009-01-30 01:44:24 +00001378
showarda1e74b32009-05-12 17:32:04 +00001379
1380 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001381 reparse_task = FinalReparseTask(queue_entries)
1382 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1383
1384
showarda1e74b32009-05-12 17:32:04 +00001385 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1386 self._copy_results(queue_entries, use_monitor)
1387 self._parse_results(queue_entries)
1388
1389
showardd3dc1992009-04-22 21:01:40 +00001390 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001391 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001392 self.monitor = PidfileRunMonitor()
1393 self.monitor.run(self.cmd, self._working_directory,
1394 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001395 log_file=self.log_file,
1396 pidfile_name=pidfile_name,
1397 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001398
1399
showardd9205182009-04-27 20:09:55 +00001400class TaskWithJobKeyvals(object):
1401 """AgentTask mixin providing functionality to help with job keyval files."""
1402 _KEYVAL_FILE = 'keyval'
1403 def _format_keyval(self, key, value):
1404 return '%s=%s' % (key, value)
1405
1406
1407 def _keyval_path(self):
1408 """Subclasses must override this"""
1409 raise NotImplemented
1410
1411
1412 def _write_keyval_after_job(self, field, value):
1413 assert self.monitor
1414 if not self.monitor.has_process():
1415 return
1416 _drone_manager.write_lines_to_file(
1417 self._keyval_path(), [self._format_keyval(field, value)],
1418 paired_with_process=self.monitor.get_process())
1419
1420
1421 def _job_queued_keyval(self, job):
1422 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1423
1424
1425 def _write_job_finished(self):
1426 self._write_keyval_after_job("job_finished", int(time.time()))
1427
1428
1429class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001430 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001431 """\
showard170873e2009-01-07 00:22:26 +00001432 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001433 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001434 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001435 # normalize the protection name
1436 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001437
jadmanski0afbb632008-06-06 21:10:57 +00001438 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001439 self.queue_entry_to_fail = queue_entry
1440 # *don't* include the queue entry in IDs -- if the queue entry is
1441 # aborted, we want to leave the repair task running
1442 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001443
1444 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001445 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1446 ['-R', '--host-protection', protection],
1447 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001448 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1449
showard170873e2009-01-07 00:22:26 +00001450 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001451
mbligh36768f02008-02-22 18:28:33 +00001452
jadmanski0afbb632008-06-06 21:10:57 +00001453 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001454 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001455 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001456 if self.queue_entry_to_fail:
1457 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001458
1459
showardd9205182009-04-27 20:09:55 +00001460 def _keyval_path(self):
1461 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1462
1463
showardde634ee2009-01-30 01:44:24 +00001464 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001465 assert self.queue_entry_to_fail
1466
1467 if self.queue_entry_to_fail.meta_host:
1468 return # don't fail metahost entries, they'll be reassigned
1469
1470 self.queue_entry_to_fail.update_from_database()
1471 if self.queue_entry_to_fail.status != 'Queued':
1472 return # entry has been aborted
1473
1474 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001475 queued_key, queued_time = self._job_queued_keyval(
1476 self.queue_entry_to_fail.job)
1477 self._write_keyval_after_job(queued_key, queued_time)
1478 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001479 # copy results logs into the normal place for job results
1480 _drone_manager.copy_results_on_drone(
1481 self.monitor.get_process(),
1482 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001483 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001484
showarda1e74b32009-05-12 17:32:04 +00001485 self._copy_results([self.queue_entry_to_fail])
1486 if self.queue_entry_to_fail.job.parse_failed_repair:
1487 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001488 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001489
1490
jadmanski0afbb632008-06-06 21:10:57 +00001491 def epilog(self):
1492 super(RepairTask, self).epilog()
1493 if self.success:
1494 self.host.set_status('Ready')
1495 else:
1496 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001497 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001498 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001499
1500
showard8fe93b52008-11-18 17:53:22 +00001501class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001502 def epilog(self):
1503 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001504 should_copy_results = (self.queue_entry and not self.success
1505 and not self.queue_entry.meta_host)
1506 if should_copy_results:
1507 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001508 destination = os.path.join(self.queue_entry.execution_tag(),
1509 os.path.basename(self.log_file))
1510 _drone_manager.copy_to_results_repository(
1511 self.monitor.get_process(), self.log_file,
1512 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001513
1514
1515class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001516 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001517 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001518 self.host = host or queue_entry.host
1519 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001520
jadmanski0afbb632008-06-06 21:10:57 +00001521 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001522 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1523 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001524 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001525 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1526 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001527
showard170873e2009-01-07 00:22:26 +00001528 self.set_host_log_file('verify', self.host)
1529 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001533 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001534 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001535 if self.queue_entry:
1536 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001537 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001538
1539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def epilog(self):
1541 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001542
jadmanski0afbb632008-06-06 21:10:57 +00001543 if self.success:
1544 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001545
1546
showardd9205182009-04-27 20:09:55 +00001547class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001548 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001549 self.job = job
1550 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001551 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001552 super(QueueTask, self).__init__(cmd, self._execution_tag())
1553 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001554
1555
showard73ec0442009-02-07 02:05:20 +00001556 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001557 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001558
1559
1560 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1561 keyval_contents = '\n'.join(self._format_keyval(key, value)
1562 for key, value in keyval_dict.iteritems())
1563 # always end with a newline to allow additional keyvals to be written
1564 keyval_contents += '\n'
1565 _drone_manager.attach_file_to_execution(self._execution_tag(),
1566 keyval_contents,
1567 file_path=keyval_path)
1568
1569
1570 def _write_keyvals_before_job(self, keyval_dict):
1571 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1572
1573
showard170873e2009-01-07 00:22:26 +00001574 def _write_host_keyvals(self, host):
1575 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1576 host.hostname)
1577 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001578 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1579 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001580
1581
showard170873e2009-01-07 00:22:26 +00001582 def _execution_tag(self):
1583 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001584
1585
jadmanski0afbb632008-06-06 21:10:57 +00001586 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001587 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001588 keyval_dict = {queued_key: queued_time}
1589 if self.group_name:
1590 keyval_dict['host_group_name'] = self.group_name
1591 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001592 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001593 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001594 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001595 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001596 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001597 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001598 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001599 assert len(self.queue_entries) == 1
1600 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001601
1602
showard35162b02009-03-03 02:17:30 +00001603 def _write_lost_process_error_file(self):
1604 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1605 _drone_manager.write_lines_to_file(error_file_path,
1606 [_LOST_PROCESS_ERROR])
1607
1608
showardd3dc1992009-04-22 21:01:40 +00001609 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001610 if not self.monitor:
1611 return
1612
showardd9205182009-04-27 20:09:55 +00001613 self._write_job_finished()
1614
showardd3dc1992009-04-22 21:01:40 +00001615 # both of these conditionals can be true, iff the process ran, wrote a
1616 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001617 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001618 gather_task = GatherLogsTask(self.job, self.queue_entries)
1619 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001620
1621 if self.monitor.lost_process:
1622 self._write_lost_process_error_file()
1623 for queue_entry in self.queue_entries:
1624 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001625
1626
showardcbd74612008-11-19 21:42:02 +00001627 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001628 _drone_manager.write_lines_to_file(
1629 os.path.join(self._execution_tag(), 'status.log'),
1630 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001631 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001632
1633
jadmanskif7fa2cc2008-10-01 14:13:23 +00001634 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001635 if not self.monitor or not self.monitor.has_process():
1636 return
1637
jadmanskif7fa2cc2008-10-01 14:13:23 +00001638 # build up sets of all the aborted_by and aborted_on values
1639 aborted_by, aborted_on = set(), set()
1640 for queue_entry in self.queue_entries:
1641 if queue_entry.aborted_by:
1642 aborted_by.add(queue_entry.aborted_by)
1643 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1644 aborted_on.add(t)
1645
1646 # extract some actual, unique aborted by value and write it out
1647 assert len(aborted_by) <= 1
1648 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001649 aborted_by_value = aborted_by.pop()
1650 aborted_on_value = max(aborted_on)
1651 else:
1652 aborted_by_value = 'autotest_system'
1653 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001654
showarda0382352009-02-11 23:36:43 +00001655 self._write_keyval_after_job("aborted_by", aborted_by_value)
1656 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001657
showardcbd74612008-11-19 21:42:02 +00001658 aborted_on_string = str(datetime.datetime.fromtimestamp(
1659 aborted_on_value))
1660 self._write_status_comment('Job aborted by %s on %s' %
1661 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001662
1663
jadmanski0afbb632008-06-06 21:10:57 +00001664 def abort(self):
1665 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001666 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001667 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001668
1669
jadmanski0afbb632008-06-06 21:10:57 +00001670 def epilog(self):
1671 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001672 self._finish_task()
1673 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001674
1675
mblighbb421852008-03-11 22:36:16 +00001676class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001677 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001678 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001679 self.monitor = run_monitor
1680 self.started = True
1681 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001682
1683
jadmanski0afbb632008-06-06 21:10:57 +00001684 def run(self):
showard5add1c82009-05-26 19:27:46 +00001685 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001686
1687
jadmanski0afbb632008-06-06 21:10:57 +00001688 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001689 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001690
1691
showardd3dc1992009-04-22 21:01:40 +00001692class PostJobTask(AgentTask):
1693 def __init__(self, queue_entries, pidfile_name, logfile_name,
1694 run_monitor=None):
1695 """
1696 If run_monitor != None, we're recovering a running task.
1697 """
1698 self._queue_entries = queue_entries
1699 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001700
1701 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1702 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1703 self._autoserv_monitor = PidfileRunMonitor()
1704 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1705 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1706
1707 if _testing_mode:
1708 command = 'true'
1709 else:
1710 command = self._generate_command(self._results_dir)
1711
1712 super(PostJobTask, self).__init__(cmd=command,
1713 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001714 # this must happen *after* the super call
1715 self.monitor = run_monitor
1716 if run_monitor:
1717 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001718
1719 self.log_file = os.path.join(self._execution_tag, logfile_name)
1720 self._final_status = self._determine_final_status()
1721
1722
1723 def _generate_command(self, results_dir):
1724 raise NotImplementedError('Subclasses must override this')
1725
1726
1727 def _job_was_aborted(self):
1728 was_aborted = None
1729 for queue_entry in self._queue_entries:
1730 queue_entry.update_from_database()
1731 if was_aborted is None: # first queue entry
1732 was_aborted = bool(queue_entry.aborted)
1733 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1734 email_manager.manager.enqueue_notify_email(
1735 'Inconsistent abort state',
1736 'Queue entries have inconsistent abort state: ' +
1737 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1738 # don't crash here, just assume true
1739 return True
1740 return was_aborted
1741
1742
1743 def _determine_final_status(self):
1744 if self._job_was_aborted():
1745 return models.HostQueueEntry.Status.ABORTED
1746
1747 # we'll use a PidfileRunMonitor to read the autoserv exit status
1748 if self._autoserv_monitor.exit_code() == 0:
1749 return models.HostQueueEntry.Status.COMPLETED
1750 return models.HostQueueEntry.Status.FAILED
1751
1752
1753 def run(self):
showard5add1c82009-05-26 19:27:46 +00001754 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001755
showard5add1c82009-05-26 19:27:46 +00001756 # make sure we actually have results to work with.
1757 # this should never happen in normal operation.
1758 if not self._autoserv_monitor.has_process():
1759 email_manager.manager.enqueue_notify_email(
1760 'No results in post-job task',
1761 'No results in post-job task at %s' %
1762 self._autoserv_monitor.pidfile_id)
1763 self.finished(False)
1764 return
1765
1766 super(PostJobTask, self).run(
1767 pidfile_name=self._pidfile_name,
1768 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001769
1770
1771 def _set_all_statuses(self, status):
1772 for queue_entry in self._queue_entries:
1773 queue_entry.set_status(status)
1774
1775
1776 def abort(self):
1777 # override AgentTask.abort() to avoid killing the process and ending
1778 # the task. post-job tasks continue when the job is aborted.
1779 pass
1780
1781
1782class GatherLogsTask(PostJobTask):
1783 """
1784 Task responsible for
1785 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1786 * copying logs to the results repository
1787 * spawning CleanupTasks for hosts, if necessary
1788 * spawning a FinalReparseTask for the job
1789 """
1790 def __init__(self, job, queue_entries, run_monitor=None):
1791 self._job = job
1792 super(GatherLogsTask, self).__init__(
1793 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1794 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1795 self._set_ids(queue_entries=queue_entries)
1796
1797
1798 def _generate_command(self, results_dir):
1799 host_list = ','.join(queue_entry.host.hostname
1800 for queue_entry in self._queue_entries)
1801 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1802 '-r', results_dir]
1803
1804
1805 def prolog(self):
1806 super(GatherLogsTask, self).prolog()
1807 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1808
1809
1810 def _reboot_hosts(self):
1811 reboot_after = self._job.reboot_after
1812 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001813 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1814 do_reboot = True
1815 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001816 do_reboot = True
1817 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1818 final_success = (
1819 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1820 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1821 do_reboot = (final_success and num_tests_failed == 0)
1822
1823 for queue_entry in self._queue_entries:
1824 if do_reboot:
1825 # don't pass the queue entry to the CleanupTask. if the cleanup
1826 # fails, the job doesn't care -- it's over.
1827 cleanup_task = CleanupTask(host=queue_entry.host)
1828 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1829 else:
1830 queue_entry.host.set_status('Ready')
1831
1832
1833 def epilog(self):
1834 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001835 if self._autoserv_monitor.has_process():
1836 self._copy_and_parse_results(self._queue_entries,
1837 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001838 self._reboot_hosts()
1839
1840
showard0bbfc212009-04-29 21:06:13 +00001841 def run(self):
showard597bfd32009-05-08 18:22:50 +00001842 autoserv_exit_code = self._autoserv_monitor.exit_code()
1843 # only run if Autoserv exited due to some signal. if we have no exit
1844 # code, assume something bad (and signal-like) happened.
1845 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001846 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001847 else:
1848 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001849
1850
showard8fe93b52008-11-18 17:53:22 +00001851class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001852 def __init__(self, host=None, queue_entry=None):
1853 assert bool(host) ^ bool(queue_entry)
1854 if queue_entry:
1855 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001856 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001857 self.host = host
showard170873e2009-01-07 00:22:26 +00001858
1859 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001860 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1861 ['--cleanup'],
1862 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001863 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001864 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1865 failure_tasks=[repair_task])
1866
1867 self._set_ids(host=host, queue_entries=[queue_entry])
1868 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001869
mblighd5c95802008-03-05 00:33:46 +00001870
jadmanski0afbb632008-06-06 21:10:57 +00001871 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001872 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001873 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001874 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001875
mblighd5c95802008-03-05 00:33:46 +00001876
showard21baa452008-10-21 00:08:39 +00001877 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001878 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001879 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001880 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001881 self.host.update_field('dirty', 0)
1882
1883
showardd3dc1992009-04-22 21:01:40 +00001884class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001885 _num_running_parses = 0
1886
showardd3dc1992009-04-22 21:01:40 +00001887 def __init__(self, queue_entries, run_monitor=None):
1888 super(FinalReparseTask, self).__init__(queue_entries,
1889 pidfile_name=_PARSER_PID_FILE,
1890 logfile_name='.parse.log',
1891 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001892 # don't use _set_ids, since we don't want to set the host_ids
1893 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001894 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001895
showard97aed502008-11-04 02:01:24 +00001896
1897 @classmethod
1898 def _increment_running_parses(cls):
1899 cls._num_running_parses += 1
1900
1901
1902 @classmethod
1903 def _decrement_running_parses(cls):
1904 cls._num_running_parses -= 1
1905
1906
1907 @classmethod
1908 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001909 return (cls._num_running_parses <
1910 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001911
1912
1913 def prolog(self):
1914 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001915 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001916
1917
1918 def epilog(self):
1919 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001920 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001921
1922
showardd3dc1992009-04-22 21:01:40 +00001923 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001924 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001925 results_dir]
showard97aed502008-11-04 02:01:24 +00001926
1927
showard08a36412009-05-05 01:01:13 +00001928 def tick(self):
1929 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001930 # and we can, at which point we revert to default behavior
1931 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001932 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001933 else:
1934 self._try_starting_parse()
1935
1936
1937 def run(self):
1938 # override run() to not actually run unless we can
1939 self._try_starting_parse()
1940
1941
1942 def _try_starting_parse(self):
1943 if not self._can_run_new_parse():
1944 return
showard170873e2009-01-07 00:22:26 +00001945
showard97aed502008-11-04 02:01:24 +00001946 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001947 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001948
showard97aed502008-11-04 02:01:24 +00001949 self._increment_running_parses()
1950 self._parse_started = True
1951
1952
1953 def finished(self, success):
1954 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001955 if self._parse_started:
1956 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001957
1958
showardc9ae1782009-01-30 01:42:37 +00001959class SetEntryPendingTask(AgentTask):
1960 def __init__(self, queue_entry):
1961 super(SetEntryPendingTask, self).__init__(cmd='')
1962 self._queue_entry = queue_entry
1963 self._set_ids(queue_entries=[queue_entry])
1964
1965
1966 def run(self):
1967 agent = self._queue_entry.on_pending()
1968 if agent:
1969 self.agent.dispatcher.add_agent(agent)
1970 self.finished(True)
1971
1972
showarda3c58572009-03-12 20:36:59 +00001973class DBError(Exception):
1974 """Raised by the DBObject constructor when its select fails."""
1975
1976
mbligh36768f02008-02-22 18:28:33 +00001977class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001978 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001979
1980 # Subclasses MUST override these:
1981 _table_name = ''
1982 _fields = ()
1983
showarda3c58572009-03-12 20:36:59 +00001984 # A mapping from (type, id) to the instance of the object for that
1985 # particular id. This prevents us from creating new Job() and Host()
1986 # instances for every HostQueueEntry object that we instantiate as
1987 # multiple HQEs often share the same Job.
1988 _instances_by_type_and_id = weakref.WeakValueDictionary()
1989 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001990
showarda3c58572009-03-12 20:36:59 +00001991
1992 def __new__(cls, id=None, **kwargs):
1993 """
1994 Look to see if we already have an instance for this particular type
1995 and id. If so, use it instead of creating a duplicate instance.
1996 """
1997 if id is not None:
1998 instance = cls._instances_by_type_and_id.get((cls, id))
1999 if instance:
2000 return instance
2001 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2002
2003
2004 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002005 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002006 assert self._table_name, '_table_name must be defined in your class'
2007 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002008 if not new_record:
2009 if self._initialized and not always_query:
2010 return # We've already been initialized.
2011 if id is None:
2012 id = row[0]
2013 # Tell future constructors to use us instead of re-querying while
2014 # this instance is still around.
2015 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002016
showard6ae5ea92009-02-25 00:11:51 +00002017 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002018
jadmanski0afbb632008-06-06 21:10:57 +00002019 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002020
jadmanski0afbb632008-06-06 21:10:57 +00002021 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002022 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002023
showarda3c58572009-03-12 20:36:59 +00002024 if self._initialized:
2025 differences = self._compare_fields_in_row(row)
2026 if differences:
showard7629f142009-03-27 21:02:02 +00002027 logging.warn(
2028 'initialized %s %s instance requery is updating: %s',
2029 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002030 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002031 self._initialized = True
2032
2033
2034 @classmethod
2035 def _clear_instance_cache(cls):
2036 """Used for testing, clear the internal instance cache."""
2037 cls._instances_by_type_and_id.clear()
2038
2039
showardccbd6c52009-03-21 00:10:21 +00002040 def _fetch_row_from_db(self, row_id):
2041 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2042 rows = _db.execute(sql, (row_id,))
2043 if not rows:
showard76e29d12009-04-15 21:53:10 +00002044 raise DBError("row not found (table=%s, row id=%s)"
2045 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002046 return rows[0]
2047
2048
showarda3c58572009-03-12 20:36:59 +00002049 def _assert_row_length(self, row):
2050 assert len(row) == len(self._fields), (
2051 "table = %s, row = %s/%d, fields = %s/%d" % (
2052 self.__table, row, len(row), self._fields, len(self._fields)))
2053
2054
2055 def _compare_fields_in_row(self, row):
2056 """
2057 Given a row as returned by a SELECT query, compare it to our existing
2058 in memory fields.
2059
2060 @param row - A sequence of values corresponding to fields named in
2061 The class attribute _fields.
2062
2063 @returns A dictionary listing the differences keyed by field name
2064 containing tuples of (current_value, row_value).
2065 """
2066 self._assert_row_length(row)
2067 differences = {}
2068 for field, row_value in itertools.izip(self._fields, row):
2069 current_value = getattr(self, field)
2070 if current_value != row_value:
2071 differences[field] = (current_value, row_value)
2072 return differences
showard2bab8f42008-11-12 18:15:22 +00002073
2074
2075 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002076 """
2077 Update our field attributes using a single row returned by SELECT.
2078
2079 @param row - A sequence of values corresponding to fields named in
2080 the class fields list.
2081 """
2082 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002083
showard2bab8f42008-11-12 18:15:22 +00002084 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002085 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002086 setattr(self, field, value)
2087 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002088
showard2bab8f42008-11-12 18:15:22 +00002089 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002090
mblighe2586682008-02-29 22:45:46 +00002091
showardccbd6c52009-03-21 00:10:21 +00002092 def update_from_database(self):
2093 assert self.id is not None
2094 row = self._fetch_row_from_db(self.id)
2095 self._update_fields_from_row(row)
2096
2097
jadmanski0afbb632008-06-06 21:10:57 +00002098 def count(self, where, table = None):
2099 if not table:
2100 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002101
jadmanski0afbb632008-06-06 21:10:57 +00002102 rows = _db.execute("""
2103 SELECT count(*) FROM %s
2104 WHERE %s
2105 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002106
jadmanski0afbb632008-06-06 21:10:57 +00002107 assert len(rows) == 1
2108
2109 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002110
2111
showardd3dc1992009-04-22 21:01:40 +00002112 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002113 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002114
showard2bab8f42008-11-12 18:15:22 +00002115 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002116 return
mbligh36768f02008-02-22 18:28:33 +00002117
mblighf8c624d2008-07-03 16:58:45 +00002118 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002119 _db.execute(query, (value, self.id))
2120
showard2bab8f42008-11-12 18:15:22 +00002121 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002122
2123
jadmanski0afbb632008-06-06 21:10:57 +00002124 def save(self):
2125 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002126 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002127 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002128 values = []
2129 for key in keys:
2130 value = getattr(self, key)
2131 if value is None:
2132 values.append('NULL')
2133 else:
2134 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002135 values_str = ','.join(values)
2136 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2137 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002138 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002139 # Update our id to the one the database just assigned to us.
2140 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002141
2142
jadmanski0afbb632008-06-06 21:10:57 +00002143 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002144 self._instances_by_type_and_id.pop((type(self), id), None)
2145 self._initialized = False
2146 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002147 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2148 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002149
2150
showard63a34772008-08-18 19:32:50 +00002151 @staticmethod
2152 def _prefix_with(string, prefix):
2153 if string:
2154 string = prefix + string
2155 return string
2156
2157
jadmanski0afbb632008-06-06 21:10:57 +00002158 @classmethod
showard989f25d2008-10-01 11:38:11 +00002159 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002160 """
2161 Construct instances of our class based on the given database query.
2162
2163 @yields One class instance for each row fetched.
2164 """
showard63a34772008-08-18 19:32:50 +00002165 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2166 where = cls._prefix_with(where, 'WHERE ')
2167 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002168 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002169 'joins' : joins,
2170 'where' : where,
2171 'order_by' : order_by})
2172 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002173 for row in rows:
2174 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002175
mbligh36768f02008-02-22 18:28:33 +00002176
2177class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002178 _table_name = 'ineligible_host_queues'
2179 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002180
2181
showard89f84db2009-03-12 20:39:13 +00002182class AtomicGroup(DBObject):
2183 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002184 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2185 'invalid')
showard89f84db2009-03-12 20:39:13 +00002186
2187
showard989f25d2008-10-01 11:38:11 +00002188class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002189 _table_name = 'labels'
2190 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002191 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002192
2193
mbligh36768f02008-02-22 18:28:33 +00002194class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002195 _table_name = 'hosts'
2196 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2197 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2198
2199
jadmanski0afbb632008-06-06 21:10:57 +00002200 def current_task(self):
2201 rows = _db.execute("""
2202 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2203 """, (self.id,))
2204
2205 if len(rows) == 0:
2206 return None
2207 else:
2208 assert len(rows) == 1
2209 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002210 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002211
2212
jadmanski0afbb632008-06-06 21:10:57 +00002213 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002214 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002215 if self.current_task():
2216 self.current_task().requeue()
2217
showard6ae5ea92009-02-25 00:11:51 +00002218
jadmanski0afbb632008-06-06 21:10:57 +00002219 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002220 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002221 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002222
2223
showard170873e2009-01-07 00:22:26 +00002224 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002225 """
showard170873e2009-01-07 00:22:26 +00002226 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002227 """
2228 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002229 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002230 FROM labels
2231 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002232 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002233 ORDER BY labels.name
2234 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002235 platform = None
2236 all_labels = []
2237 for label_name, is_platform in rows:
2238 if is_platform:
2239 platform = label_name
2240 all_labels.append(label_name)
2241 return platform, all_labels
2242
2243
showarda64e52a2009-06-08 23:24:08 +00002244 def reverify_tasks(self, cleanup=True):
2245 tasks = [VerifyTask(host=self)]
2246 if cleanup:
2247 tasks.insert(0, CleanupTask(host=self))
showard170873e2009-01-07 00:22:26 +00002248 # just to make sure this host does not get taken away
2249 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002250 return tasks
showardd8e548a2008-09-09 03:04:57 +00002251
2252
showard54c1ea92009-05-20 00:32:58 +00002253 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2254
2255
2256 @classmethod
2257 def cmp_for_sort(cls, a, b):
2258 """
2259 A comparison function for sorting Host objects by hostname.
2260
2261 This strips any trailing numeric digits, ignores leading 0s and
2262 compares hostnames by the leading name and the trailing digits as a
2263 number. If both hostnames do not match this pattern, they are simply
2264 compared as lower case strings.
2265
2266 Example of how hostnames will be sorted:
2267
2268 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2269
2270 This hopefully satisfy most people's hostname sorting needs regardless
2271 of their exact naming schemes. Nobody sane should have both a host10
2272 and host010 (but the algorithm works regardless).
2273 """
2274 lower_a = a.hostname.lower()
2275 lower_b = b.hostname.lower()
2276 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2277 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2278 if match_a and match_b:
2279 name_a, number_a_str = match_a.groups()
2280 name_b, number_b_str = match_b.groups()
2281 number_a = int(number_a_str.lstrip('0'))
2282 number_b = int(number_b_str.lstrip('0'))
2283 result = cmp((name_a, number_a), (name_b, number_b))
2284 if result == 0 and lower_a != lower_b:
2285 # If they compared equal above but the lower case names are
2286 # indeed different, don't report equality. abc012 != abc12.
2287 return cmp(lower_a, lower_b)
2288 return result
2289 else:
2290 return cmp(lower_a, lower_b)
2291
2292
mbligh36768f02008-02-22 18:28:33 +00002293class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002294 _table_name = 'host_queue_entries'
2295 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002296 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002297 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002298
2299
showarda3c58572009-03-12 20:36:59 +00002300 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002301 assert id or row
showarda3c58572009-03-12 20:36:59 +00002302 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002303 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002304
jadmanski0afbb632008-06-06 21:10:57 +00002305 if self.host_id:
2306 self.host = Host(self.host_id)
2307 else:
2308 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002309
showard170873e2009-01-07 00:22:26 +00002310 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002311 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002312
2313
showard89f84db2009-03-12 20:39:13 +00002314 @classmethod
2315 def clone(cls, template):
2316 """
2317 Creates a new row using the values from a template instance.
2318
2319 The new instance will not exist in the database or have a valid
2320 id attribute until its save() method is called.
2321 """
2322 assert isinstance(template, cls)
2323 new_row = [getattr(template, field) for field in cls._fields]
2324 clone = cls(row=new_row, new_record=True)
2325 clone.id = None
2326 return clone
2327
2328
showardc85c21b2008-11-24 22:17:37 +00002329 def _view_job_url(self):
2330 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2331
2332
showardf1ae3542009-05-11 19:26:02 +00002333 def get_labels(self):
2334 """
2335 Get all labels associated with this host queue entry (either via the
2336 meta_host or as a job dependency label). The labels yielded are not
2337 guaranteed to be unique.
2338
2339 @yields Label instances associated with this host_queue_entry.
2340 """
2341 if self.meta_host:
2342 yield Label(id=self.meta_host, always_query=False)
2343 labels = Label.fetch(
2344 joins="JOIN jobs_dependency_labels AS deps "
2345 "ON (labels.id = deps.label_id)",
2346 where="deps.job_id = %d" % self.job.id)
2347 for label in labels:
2348 yield label
2349
2350
jadmanski0afbb632008-06-06 21:10:57 +00002351 def set_host(self, host):
2352 if host:
2353 self.queue_log_record('Assigning host ' + host.hostname)
2354 self.update_field('host_id', host.id)
2355 self.update_field('active', True)
2356 self.block_host(host.id)
2357 else:
2358 self.queue_log_record('Releasing host')
2359 self.unblock_host(self.host.id)
2360 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002361
jadmanski0afbb632008-06-06 21:10:57 +00002362 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002363
2364
jadmanski0afbb632008-06-06 21:10:57 +00002365 def get_host(self):
2366 return self.host
mbligh36768f02008-02-22 18:28:33 +00002367
2368
jadmanski0afbb632008-06-06 21:10:57 +00002369 def queue_log_record(self, log_line):
2370 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002371 _drone_manager.write_lines_to_file(self.queue_log_path,
2372 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002373
2374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002376 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002377 row = [0, self.job.id, host_id]
2378 block = IneligibleHostQueue(row=row, new_record=True)
2379 block.save()
mblighe2586682008-02-29 22:45:46 +00002380
2381
jadmanski0afbb632008-06-06 21:10:57 +00002382 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002383 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002384 blocks = IneligibleHostQueue.fetch(
2385 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2386 for block in blocks:
2387 block.delete()
mblighe2586682008-02-29 22:45:46 +00002388
2389
showard2bab8f42008-11-12 18:15:22 +00002390 def set_execution_subdir(self, subdir=None):
2391 if subdir is None:
2392 assert self.get_host()
2393 subdir = self.get_host().hostname
2394 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002395
2396
showard6355f6b2008-12-05 18:52:13 +00002397 def _get_hostname(self):
2398 if self.host:
2399 return self.host.hostname
2400 return 'no host'
2401
2402
showard170873e2009-01-07 00:22:26 +00002403 def __str__(self):
2404 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2405
2406
jadmanski0afbb632008-06-06 21:10:57 +00002407 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002408 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002409
showardb18134f2009-03-20 20:52:18 +00002410 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002411
showardc85c21b2008-11-24 22:17:37 +00002412 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002413 self.update_field('complete', False)
2414 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002415
jadmanski0afbb632008-06-06 21:10:57 +00002416 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002417 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002418 self.update_field('complete', False)
2419 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002420
showardc85c21b2008-11-24 22:17:37 +00002421 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002422 self.update_field('complete', True)
2423 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002424
2425 should_email_status = (status.lower() in _notify_email_statuses or
2426 'all' in _notify_email_statuses)
2427 if should_email_status:
2428 self._email_on_status(status)
2429
2430 self._email_on_job_complete()
2431
2432
2433 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002434 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002435
2436 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2437 self.job.id, self.job.name, hostname, status)
2438 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2439 self.job.id, self.job.name, hostname, status,
2440 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002441 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002442
2443
2444 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002445 if not self.job.is_finished():
2446 return
showard542e8402008-09-19 20:16:18 +00002447
showardc85c21b2008-11-24 22:17:37 +00002448 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002449 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002450 for queue_entry in hosts_queue:
2451 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002452 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002453 queue_entry.status))
2454
2455 summary_text = "\n".join(summary_text)
2456 status_counts = models.Job.objects.get_status_counts(
2457 [self.job.id])[self.job.id]
2458 status = ', '.join('%d %s' % (count, status) for status, count
2459 in status_counts.iteritems())
2460
2461 subject = 'Autotest: Job ID: %s "%s" %s' % (
2462 self.job.id, self.job.name, status)
2463 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2464 self.job.id, self.job.name, status, self._view_job_url(),
2465 summary_text)
showard170873e2009-01-07 00:22:26 +00002466 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002467
2468
showard89f84db2009-03-12 20:39:13 +00002469 def run(self, assigned_host=None):
2470 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002471 assert assigned_host
2472 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002473 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002474
showardb18134f2009-03-20 20:52:18 +00002475 logging.info("%s/%s/%s scheduled on %s, status=%s",
2476 self.job.name, self.meta_host, self.atomic_group_id,
2477 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002478
jadmanski0afbb632008-06-06 21:10:57 +00002479 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002480
showard6ae5ea92009-02-25 00:11:51 +00002481
jadmanski0afbb632008-06-06 21:10:57 +00002482 def requeue(self):
2483 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002484 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002485 # verify/cleanup failure sets the execution subdir, so reset it here
2486 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002487 if self.meta_host:
2488 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002489
2490
jadmanski0afbb632008-06-06 21:10:57 +00002491 def handle_host_failure(self):
2492 """\
2493 Called when this queue entry's host has failed verification and
2494 repair.
2495 """
2496 assert not self.meta_host
2497 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002498 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002499
2500
jadmanskif7fa2cc2008-10-01 14:13:23 +00002501 @property
2502 def aborted_by(self):
2503 self._load_abort_info()
2504 return self._aborted_by
2505
2506
2507 @property
2508 def aborted_on(self):
2509 self._load_abort_info()
2510 return self._aborted_on
2511
2512
2513 def _load_abort_info(self):
2514 """ Fetch info about who aborted the job. """
2515 if hasattr(self, "_aborted_by"):
2516 return
2517 rows = _db.execute("""
2518 SELECT users.login, aborted_host_queue_entries.aborted_on
2519 FROM aborted_host_queue_entries
2520 INNER JOIN users
2521 ON users.id = aborted_host_queue_entries.aborted_by_id
2522 WHERE aborted_host_queue_entries.queue_entry_id = %s
2523 """, (self.id,))
2524 if rows:
2525 self._aborted_by, self._aborted_on = rows[0]
2526 else:
2527 self._aborted_by = self._aborted_on = None
2528
2529
showardb2e2c322008-10-14 17:33:55 +00002530 def on_pending(self):
2531 """
2532 Called when an entry in a synchronous job has passed verify. If the
2533 job is ready to run, returns an agent to run the job. Returns None
2534 otherwise.
2535 """
2536 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002537 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002538 if self.job.is_ready():
2539 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002540 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002541 return None
2542
2543
showardd3dc1992009-04-22 21:01:40 +00002544 def abort(self, dispatcher):
2545 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002546
showardd3dc1992009-04-22 21:01:40 +00002547 Status = models.HostQueueEntry.Status
2548 has_running_job_agent = (
2549 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2550 and dispatcher.get_agents_for_entry(self))
2551 if has_running_job_agent:
2552 # do nothing; post-job tasks will finish and then mark this entry
2553 # with status "Aborted" and take care of the host
2554 return
2555
2556 if self.status in (Status.STARTING, Status.PENDING):
2557 self.host.set_status(models.Host.Status.READY)
2558 elif self.status == Status.VERIFYING:
2559 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2560
2561 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002562
2563 def execution_tag(self):
2564 assert self.execution_subdir
2565 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002566
2567
mbligh36768f02008-02-22 18:28:33 +00002568class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002569 _table_name = 'jobs'
2570 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2571 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002572 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002573 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002574
2575
showarda3c58572009-03-12 20:36:59 +00002576 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002577 assert id or row
showarda3c58572009-03-12 20:36:59 +00002578 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002579
mblighe2586682008-02-29 22:45:46 +00002580
jadmanski0afbb632008-06-06 21:10:57 +00002581 def is_server_job(self):
2582 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002583
2584
showard170873e2009-01-07 00:22:26 +00002585 def tag(self):
2586 return "%s-%s" % (self.id, self.owner)
2587
2588
jadmanski0afbb632008-06-06 21:10:57 +00002589 def get_host_queue_entries(self):
2590 rows = _db.execute("""
2591 SELECT * FROM host_queue_entries
2592 WHERE job_id= %s
2593 """, (self.id,))
2594 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002595
jadmanski0afbb632008-06-06 21:10:57 +00002596 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002597
jadmanski0afbb632008-06-06 21:10:57 +00002598 return entries
mbligh36768f02008-02-22 18:28:33 +00002599
2600
jadmanski0afbb632008-06-06 21:10:57 +00002601 def set_status(self, status, update_queues=False):
2602 self.update_field('status',status)
2603
2604 if update_queues:
2605 for queue_entry in self.get_host_queue_entries():
2606 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002607
2608
jadmanski0afbb632008-06-06 21:10:57 +00002609 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002610 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2611 status='Pending')
2612 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002613
2614
jadmanski0afbb632008-06-06 21:10:57 +00002615 def num_machines(self, clause = None):
2616 sql = "job_id=%s" % self.id
2617 if clause:
2618 sql += " AND (%s)" % clause
2619 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002620
2621
jadmanski0afbb632008-06-06 21:10:57 +00002622 def num_queued(self):
2623 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002624
2625
jadmanski0afbb632008-06-06 21:10:57 +00002626 def num_active(self):
2627 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002628
2629
jadmanski0afbb632008-06-06 21:10:57 +00002630 def num_complete(self):
2631 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002632
2633
jadmanski0afbb632008-06-06 21:10:57 +00002634 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002635 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002636
mbligh36768f02008-02-22 18:28:33 +00002637
showard6bb7c292009-01-30 01:44:51 +00002638 def _not_yet_run_entries(self, include_verifying=True):
2639 statuses = [models.HostQueueEntry.Status.QUEUED,
2640 models.HostQueueEntry.Status.PENDING]
2641 if include_verifying:
2642 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2643 return models.HostQueueEntry.objects.filter(job=self.id,
2644 status__in=statuses)
2645
2646
2647 def _stop_all_entries(self):
2648 entries_to_stop = self._not_yet_run_entries(
2649 include_verifying=False)
2650 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002651 assert not child_entry.complete, (
2652 '%s status=%s, active=%s, complete=%s' %
2653 (child_entry.id, child_entry.status, child_entry.active,
2654 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002655 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2656 child_entry.host.status = models.Host.Status.READY
2657 child_entry.host.save()
2658 child_entry.status = models.HostQueueEntry.Status.STOPPED
2659 child_entry.save()
2660
showard2bab8f42008-11-12 18:15:22 +00002661 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002662 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002663 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002664 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002665
2666
jadmanski0afbb632008-06-06 21:10:57 +00002667 def write_to_machines_file(self, queue_entry):
2668 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002669 file_path = os.path.join(self.tag(), '.machines')
2670 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002671
2672
showardf1ae3542009-05-11 19:26:02 +00002673 def _next_group_name(self, group_name=''):
2674 """@returns a directory name to use for the next host group results."""
2675 if group_name:
2676 # Sanitize for use as a pathname.
2677 group_name = group_name.replace(os.path.sep, '_')
2678 if group_name.startswith('.'):
2679 group_name = '_' + group_name[1:]
2680 # Add a separator between the group name and 'group%d'.
2681 group_name += '.'
2682 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002683 query = models.HostQueueEntry.objects.filter(
2684 job=self.id).values('execution_subdir').distinct()
2685 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002686 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2687 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002688 if ids:
2689 next_id = max(ids) + 1
2690 else:
2691 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002692 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002693
2694
showard170873e2009-01-07 00:22:26 +00002695 def _write_control_file(self, execution_tag):
2696 control_path = _drone_manager.attach_file_to_execution(
2697 execution_tag, self.control_file)
2698 return control_path
mbligh36768f02008-02-22 18:28:33 +00002699
showardb2e2c322008-10-14 17:33:55 +00002700
showard2bab8f42008-11-12 18:15:22 +00002701 def get_group_entries(self, queue_entry_from_group):
2702 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002703 return list(HostQueueEntry.fetch(
2704 where='job_id=%s AND execution_subdir=%s',
2705 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002706
2707
showardb2e2c322008-10-14 17:33:55 +00002708 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002709 assert queue_entries
2710 execution_tag = queue_entries[0].execution_tag()
2711 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002712 hostnames = ','.join([entry.get_host().hostname
2713 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002714
showard87ba02a2009-04-20 19:37:32 +00002715 params = _autoserv_command_line(
2716 hostnames, execution_tag,
2717 ['-P', execution_tag, '-n',
2718 _drone_manager.absolute_path(control_path)],
2719 job=self)
mbligh36768f02008-02-22 18:28:33 +00002720
jadmanski0afbb632008-06-06 21:10:57 +00002721 if not self.is_server_job():
2722 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002723
showardb2e2c322008-10-14 17:33:55 +00002724 return params
mblighe2586682008-02-29 22:45:46 +00002725
mbligh36768f02008-02-22 18:28:33 +00002726
showardc9ae1782009-01-30 01:42:37 +00002727 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002728 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002729 return True
showard0fc38302008-10-23 00:44:07 +00002730 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002731 return queue_entry.get_host().dirty
2732 return False
showard21baa452008-10-21 00:08:39 +00002733
showardc9ae1782009-01-30 01:42:37 +00002734
2735 def _should_run_verify(self, queue_entry):
2736 do_not_verify = (queue_entry.host.protection ==
2737 host_protections.Protection.DO_NOT_VERIFY)
2738 if do_not_verify:
2739 return False
2740 return self.run_verify
2741
2742
2743 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002744 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002745 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002746 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002747 if self._should_run_verify(queue_entry):
2748 tasks.append(VerifyTask(queue_entry=queue_entry))
2749 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002750 return tasks
2751
2752
showardf1ae3542009-05-11 19:26:02 +00002753 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002754 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002755 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002756 else:
showardf1ae3542009-05-11 19:26:02 +00002757 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002758 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002759 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002760 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002761
2762 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002763 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002764
2765
2766 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002767 """
2768 @returns A tuple containing a list of HostQueueEntry instances to be
2769 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002770 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002771 """
2772 if include_queue_entry.atomic_group_id:
2773 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2774 always_query=False)
2775 else:
2776 atomic_group = None
2777
showard2bab8f42008-11-12 18:15:22 +00002778 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002779 if atomic_group:
2780 num_entries_wanted = atomic_group.max_number_of_machines
2781 else:
2782 num_entries_wanted = self.synch_count
2783 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002784
showardf1ae3542009-05-11 19:26:02 +00002785 if num_entries_wanted > 0:
2786 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002787 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002788 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002789 params=(self.id, include_queue_entry.id)))
2790
2791 # Sort the chosen hosts by hostname before slicing.
2792 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2793 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2794 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2795 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002796
showardf1ae3542009-05-11 19:26:02 +00002797 # Sanity check. We'll only ever be called if this can be met.
2798 assert len(chosen_entries) >= self.synch_count
2799
2800 if atomic_group:
2801 # Look at any meta_host and dependency labels and pick the first
2802 # one that also specifies this atomic group. Use that label name
2803 # as the group name if possible (it is more specific).
2804 group_name = atomic_group.name
2805 for label in include_queue_entry.get_labels():
2806 if label.atomic_group_id:
2807 assert label.atomic_group_id == atomic_group.id
2808 group_name = label.name
2809 break
2810 else:
2811 group_name = ''
2812
2813 self._assign_new_group(chosen_entries, group_name=group_name)
2814 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002815
2816
2817 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002818 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002819 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2820 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002821
showardf1ae3542009-05-11 19:26:02 +00002822 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2823 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002824
2825
showardf1ae3542009-05-11 19:26:02 +00002826 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002827 for queue_entry in queue_entries:
2828 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002829 params = self._get_autoserv_params(queue_entries)
2830 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002831 cmd=params, group_name=group_name)
2832 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002833 entry_ids = [entry.id for entry in queue_entries]
2834
showard170873e2009-01-07 00:22:26 +00002835 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002836
2837
mbligh36768f02008-02-22 18:28:33 +00002838if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002839 main()