blob: d5295864ec4a8c6c7b5a0bfd9c87bba6f182b6ac [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000227 """
228 @returns The autoserv command line as a list of executable + parameters.
229
230 @param machines - string - A machine or comma separated list of machines
231 for the (-m) flag.
232 @param results_dir - string - Where the results will be written (-r).
233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
240 '-r', _drone_manager.absolute_path(results_dir)]
241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
245 return autoserv_argv + extra_args
246
247
showard89f84db2009-03-12 20:39:13 +0000248class SchedulerError(Exception):
249 """Raised by HostScheduler when an inconsistent state occurs."""
250
251
showard63a34772008-08-18 19:32:50 +0000252class HostScheduler(object):
253 def _get_ready_hosts(self):
254 # avoid any host with a currently active queue entry against it
255 hosts = Host.fetch(
256 joins='LEFT JOIN host_queue_entries AS active_hqe '
257 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000258 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000259 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000260 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000261 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
262 return dict((host.id, host) for host in hosts)
263
264
265 @staticmethod
266 def _get_sql_id_list(id_list):
267 return ','.join(str(item_id) for item_id in id_list)
268
269
270 @classmethod
showard989f25d2008-10-01 11:38:11 +0000271 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000272 if not id_list:
273 return {}
showard63a34772008-08-18 19:32:50 +0000274 query %= cls._get_sql_id_list(id_list)
275 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000276 return cls._process_many2many_dict(rows, flip)
277
278
279 @staticmethod
280 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000281 result = {}
282 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000283 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000284 if flip:
285 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000286 result.setdefault(left_id, set()).add(right_id)
287 return result
288
289
290 @classmethod
291 def _get_job_acl_groups(cls, job_ids):
292 query = """
showardd9ac4452009-02-07 02:04:37 +0000293 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000294 FROM jobs
295 INNER JOIN users ON users.login = jobs.owner
296 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
297 WHERE jobs.id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
303 def _get_job_ineligible_hosts(cls, job_ids):
304 query = """
305 SELECT job_id, host_id
306 FROM ineligible_host_queues
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard989f25d2008-10-01 11:38:11 +0000313 def _get_job_dependencies(cls, job_ids):
314 query = """
315 SELECT job_id, label_id
316 FROM jobs_dependency_labels
317 WHERE job_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, job_ids)
320
321
322 @classmethod
showard63a34772008-08-18 19:32:50 +0000323 def _get_host_acls(cls, host_ids):
324 query = """
showardd9ac4452009-02-07 02:04:37 +0000325 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000326 FROM acl_groups_hosts
327 WHERE host_id IN (%s)
328 """
329 return cls._get_many2many_dict(query, host_ids)
330
331
332 @classmethod
333 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000334 if not host_ids:
335 return {}, {}
showard63a34772008-08-18 19:32:50 +0000336 query = """
337 SELECT label_id, host_id
338 FROM hosts_labels
339 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000340 """ % cls._get_sql_id_list(host_ids)
341 rows = _db.execute(query)
342 labels_to_hosts = cls._process_many2many_dict(rows)
343 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
344 return labels_to_hosts, hosts_to_labels
345
346
347 @classmethod
348 def _get_labels(cls):
349 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000350
351
352 def refresh(self, pending_queue_entries):
353 self._hosts_available = self._get_ready_hosts()
354
355 relevant_jobs = [queue_entry.job_id
356 for queue_entry in pending_queue_entries]
357 self._job_acls = self._get_job_acl_groups(relevant_jobs)
358 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000359 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000360
361 host_ids = self._hosts_available.keys()
362 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000363 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
364
365 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000366
367
368 def _is_acl_accessible(self, host_id, queue_entry):
369 job_acls = self._job_acls.get(queue_entry.job_id, set())
370 host_acls = self._host_acls.get(host_id, set())
371 return len(host_acls.intersection(job_acls)) > 0
372
373
showard989f25d2008-10-01 11:38:11 +0000374 def _check_job_dependencies(self, job_dependencies, host_labels):
375 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000376 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000377
378
379 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
380 queue_entry):
showardade14e22009-01-26 22:38:32 +0000381 if not queue_entry.meta_host:
382 # bypass only_if_needed labels when a specific host is selected
383 return True
384
showard989f25d2008-10-01 11:38:11 +0000385 for label_id in host_labels:
386 label = self._labels[label_id]
387 if not label.only_if_needed:
388 # we don't care about non-only_if_needed labels
389 continue
390 if queue_entry.meta_host == label_id:
391 # if the label was requested in a metahost it's OK
392 continue
393 if label_id not in job_dependencies:
394 return False
395 return True
396
397
showard89f84db2009-03-12 20:39:13 +0000398 def _check_atomic_group_labels(self, host_labels, queue_entry):
399 """
400 Determine if the given HostQueueEntry's atomic group settings are okay
401 to schedule on a host with the given labels.
402
403 @param host_labels - A list of label ids that the host has.
404 @param queue_entry - The HostQueueEntry being considered for the host.
405
406 @returns True if atomic group settings are okay, False otherwise.
407 """
408 return (self._get_host_atomic_group_id(host_labels) ==
409 queue_entry.atomic_group_id)
410
411
412 def _get_host_atomic_group_id(self, host_labels):
413 """
414 Return the atomic group label id for a host with the given set of
415 labels if any, or None otherwise. Raises an exception if more than
416 one atomic group are found in the set of labels.
417
418 @param host_labels - A list of label ids that the host has.
419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
422 @raises SchedulerError - If more than one atomic group label is found.
423 """
424 atomic_ids = [self._labels[label_id].atomic_group_id
425 for label_id in host_labels
426 if self._labels[label_id].atomic_group_id is not None]
427 if not atomic_ids:
428 return None
429 if len(atomic_ids) > 1:
430 raise SchedulerError('More than one atomic label on host.')
431 return atomic_ids[0]
432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
showard54c1ea92009-05-20 00:32:58 +0000447 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
463 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
464 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000465
showard89f84db2009-03-12 20:39:13 +0000466 return (self._is_acl_accessible(host_id, queue_entry) and
467 self._check_job_dependencies(job_dependencies, host_labels) and
468 self._check_only_if_needed_labels(
469 job_dependencies, host_labels, queue_entry) and
470 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000471
472
showard63a34772008-08-18 19:32:50 +0000473 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000474 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000475 return None
476 return self._hosts_available.pop(queue_entry.host_id, None)
477
478
479 def _is_host_usable(self, host_id):
480 if host_id not in self._hosts_available:
481 # host was already used during this scheduling cycle
482 return False
483 if self._hosts_available[host_id].invalid:
484 # Invalid hosts cannot be used for metahosts. They're included in
485 # the original query because they can be used by non-metahosts.
486 return False
487 return True
488
489
490 def _schedule_metahost(self, queue_entry):
491 label_id = queue_entry.meta_host
492 hosts_in_label = self._label_hosts.get(label_id, set())
493 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
494 set())
495
496 # must iterate over a copy so we can mutate the original while iterating
497 for host_id in list(hosts_in_label):
498 if not self._is_host_usable(host_id):
499 hosts_in_label.remove(host_id)
500 continue
501 if host_id in ineligible_host_ids:
502 continue
showard989f25d2008-10-01 11:38:11 +0000503 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000504 continue
505
showard89f84db2009-03-12 20:39:13 +0000506 # Remove the host from our cached internal state before returning
507 # the host object.
showard63a34772008-08-18 19:32:50 +0000508 hosts_in_label.remove(host_id)
509 return self._hosts_available.pop(host_id)
510 return None
511
512
513 def find_eligible_host(self, queue_entry):
514 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000515 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000516 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000517 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000518 return self._schedule_metahost(queue_entry)
519
520
showard89f84db2009-03-12 20:39:13 +0000521 def find_eligible_atomic_group(self, queue_entry):
522 """
523 Given an atomic group host queue entry, locate an appropriate group
524 of hosts for the associated job to run on.
525
526 The caller is responsible for creating new HQEs for the additional
527 hosts returned in order to run the actual job on them.
528
529 @returns A list of Host instances in a ready state to satisfy this
530 atomic group scheduling. Hosts will all belong to the same
531 atomic group label as specified by the queue_entry.
532 An empty list will be returned if no suitable atomic
533 group could be found.
534
535 TODO(gps): what is responsible for kicking off any attempted repairs on
536 a group of hosts? not this function, but something needs to. We do
537 not communicate that reason for returning [] outside of here...
538 For now, we'll just be unschedulable if enough hosts within one group
539 enter Repair Failed state.
540 """
541 assert queue_entry.atomic_group_id is not None
542 job = queue_entry.job
543 assert job.synch_count and job.synch_count > 0
544 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
545 if job.synch_count > atomic_group.max_number_of_machines:
546 # Such a Job and HostQueueEntry should never be possible to
547 # create using the frontend. Regardless, we can't process it.
548 # Abort it immediately and log an error on the scheduler.
549 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000550 logging.error(
551 'Error: job %d synch_count=%d > requested atomic_group %d '
552 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
553 job.id, job.synch_count, atomic_group.id,
554 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000555 return []
556 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
557 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
558 set())
559
560 # Look in each label associated with atomic_group until we find one with
561 # enough hosts to satisfy the job.
562 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
563 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
564 if queue_entry.meta_host is not None:
565 # If we have a metahost label, only allow its hosts.
566 group_hosts.intersection_update(hosts_in_label)
567 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000568 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000569 group_hosts, queue_entry)
570
571 # Job.synch_count is treated as "minimum synch count" when
572 # scheduling for an atomic group of hosts. The atomic group
573 # number of machines is the maximum to pick out of a single
574 # atomic group label for scheduling at one time.
575 min_hosts = job.synch_count
576 max_hosts = atomic_group.max_number_of_machines
577
showard54c1ea92009-05-20 00:32:58 +0000578 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000579 # Not enough eligible hosts in this atomic group label.
580 continue
581
showard54c1ea92009-05-20 00:32:58 +0000582 eligible_hosts_in_group = [self._hosts_available[id]
583 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000584 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000585 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000586
showard89f84db2009-03-12 20:39:13 +0000587 # Limit ourselves to scheduling the atomic group size.
588 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000589 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000590
591 # Remove the selected hosts from our cached internal state
592 # of available hosts in order to return the Host objects.
593 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000594 for host in eligible_hosts_in_group:
595 hosts_in_label.discard(host.id)
596 self._hosts_available.pop(host.id)
597 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000598 return host_list
599
600 return []
601
602
showard170873e2009-01-07 00:22:26 +0000603class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000604 def __init__(self):
605 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000606 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000607 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000608 user_cleanup_time = scheduler_config.config.clean_interval
609 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
610 _db, user_cleanup_time)
611 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000612 self._host_agents = {}
613 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000614
mbligh36768f02008-02-22 18:28:33 +0000615
showard915958d2009-04-22 21:00:58 +0000616 def initialize(self, recover_hosts=True):
617 self._periodic_cleanup.initialize()
618 self._24hr_upkeep.initialize()
619
jadmanski0afbb632008-06-06 21:10:57 +0000620 # always recover processes
621 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000622
jadmanski0afbb632008-06-06 21:10:57 +0000623 if recover_hosts:
624 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000625
626
jadmanski0afbb632008-06-06 21:10:57 +0000627 def tick(self):
showard170873e2009-01-07 00:22:26 +0000628 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000629 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000630 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000631 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000632 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000633 self._schedule_new_jobs()
634 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000635 _drone_manager.execute_actions()
636 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000637
showard97aed502008-11-04 02:01:24 +0000638
mblighf3294cc2009-04-08 21:17:38 +0000639 def _run_cleanup(self):
640 self._periodic_cleanup.run_cleanup_maybe()
641 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000642
mbligh36768f02008-02-22 18:28:33 +0000643
showard170873e2009-01-07 00:22:26 +0000644 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
645 for object_id in object_ids:
646 agent_dict.setdefault(object_id, set()).add(agent)
647
648
649 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
650 for object_id in object_ids:
651 assert object_id in agent_dict
652 agent_dict[object_id].remove(agent)
653
654
jadmanski0afbb632008-06-06 21:10:57 +0000655 def add_agent(self, agent):
656 self._agents.append(agent)
657 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000658 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
659 self._register_agent_for_ids(self._queue_entry_agents,
660 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000661
showard170873e2009-01-07 00:22:26 +0000662
663 def get_agents_for_entry(self, queue_entry):
664 """
665 Find agents corresponding to the specified queue_entry.
666 """
showardd3dc1992009-04-22 21:01:40 +0000667 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000668
669
670 def host_has_agent(self, host):
671 """
672 Determine if there is currently an Agent present using this host.
673 """
674 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000675
676
jadmanski0afbb632008-06-06 21:10:57 +0000677 def remove_agent(self, agent):
678 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000679 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
680 agent)
681 self._unregister_agent_for_ids(self._queue_entry_agents,
682 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000683
684
jadmanski0afbb632008-06-06 21:10:57 +0000685 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000686 self._register_pidfiles()
687 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000688 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000689 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000690 self._reverify_remaining_hosts()
691 # reinitialize drones after killing orphaned processes, since they can
692 # leave around files when they die
693 _drone_manager.execute_actions()
694 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000695
showard170873e2009-01-07 00:22:26 +0000696
697 def _register_pidfiles(self):
698 # during recovery we may need to read pidfiles for both running and
699 # parsing entries
700 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000701 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000702 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000703 for pidfile_name in _ALL_PIDFILE_NAMES:
704 pidfile_id = _drone_manager.get_pidfile_id_from(
705 queue_entry.execution_tag(), pidfile_name=pidfile_name)
706 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000707
708
showardd3dc1992009-04-22 21:01:40 +0000709 def _recover_entries_with_status(self, status, orphans, pidfile_name,
710 recover_entries_fn):
711 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000712 for queue_entry in queue_entries:
713 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000714 # synchronous job we've already recovered
715 continue
showardd3dc1992009-04-22 21:01:40 +0000716 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000717 execution_tag = queue_entry.execution_tag()
718 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000719 run_monitor.attach_to_existing_process(execution_tag,
720 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000721
722 log_message = ('Recovering %s entry %s ' %
723 (status.lower(),
724 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000725 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000726 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000727 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000728 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000729 continue
mbligh90a549d2008-03-25 23:52:34 +0000730
showard597bfd32009-05-08 18:22:50 +0000731 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000732 run_monitor.get_process())
733 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
734 orphans.discard(run_monitor.get_process())
735
736
737 def _kill_remaining_orphan_processes(self, orphans):
738 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000739 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000740 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000741
showard170873e2009-01-07 00:22:26 +0000742
showardd3dc1992009-04-22 21:01:40 +0000743 def _recover_running_entries(self, orphans):
744 def recover_entries(job, queue_entries, run_monitor):
745 if run_monitor is not None:
746 queue_task = RecoveryQueueTask(job=job,
747 queue_entries=queue_entries,
748 run_monitor=run_monitor)
749 self.add_agent(Agent(tasks=[queue_task],
750 num_processes=len(queue_entries)))
751 # else, _requeue_other_active_entries will cover this
752
753 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
754 orphans, '.autoserv_execute',
755 recover_entries)
756
757
758 def _recover_gathering_entries(self, orphans):
759 def recover_entries(job, queue_entries, run_monitor):
760 gather_task = GatherLogsTask(job, queue_entries,
761 run_monitor=run_monitor)
762 self.add_agent(Agent([gather_task]))
763
764 self._recover_entries_with_status(
765 models.HostQueueEntry.Status.GATHERING,
766 orphans, _CRASHINFO_PID_FILE, recover_entries)
767
768
769 def _recover_parsing_entries(self, orphans):
770 def recover_entries(job, queue_entries, run_monitor):
771 reparse_task = FinalReparseTask(queue_entries,
772 run_monitor=run_monitor)
773 self.add_agent(Agent([reparse_task], num_processes=0))
774
775 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
776 orphans, _PARSER_PID_FILE,
777 recover_entries)
778
779
780 def _recover_all_recoverable_entries(self):
781 orphans = _drone_manager.get_orphaned_autoserv_processes()
782 self._recover_running_entries(orphans)
783 self._recover_gathering_entries(orphans)
784 self._recover_parsing_entries(orphans)
785 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000786
showard97aed502008-11-04 02:01:24 +0000787
showard170873e2009-01-07 00:22:26 +0000788 def _requeue_other_active_entries(self):
789 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000790 where='active AND NOT complete AND '
791 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000792 for queue_entry in queue_entries:
793 if self.get_agents_for_entry(queue_entry):
794 # entry has already been recovered
795 continue
showardd3dc1992009-04-22 21:01:40 +0000796 if queue_entry.aborted:
797 queue_entry.abort(self)
798 continue
799
800 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000801 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000802 if queue_entry.host:
803 tasks = queue_entry.host.reverify_tasks()
804 self.add_agent(Agent(tasks))
805 agent = queue_entry.requeue()
806
807
showard1ff7b2e2009-05-15 23:17:18 +0000808 def _find_reverify(self):
809 self._reverify_hosts_where("status = 'Reverify'")
810
811
showard170873e2009-01-07 00:22:26 +0000812 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000813 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000814 self._reverify_hosts_where("""(status = 'Repairing' OR
815 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000816 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000817
showard170873e2009-01-07 00:22:26 +0000818 # recover "Running" hosts with no active queue entries, although this
819 # should never happen
820 message = ('Recovering running host %s - this probably indicates a '
821 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000822 self._reverify_hosts_where("""status = 'Running' AND
823 id NOT IN (SELECT host_id
824 FROM host_queue_entries
825 WHERE active)""",
826 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000827
828
jadmanski0afbb632008-06-06 21:10:57 +0000829 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000830 print_message='Reverifying host %s'):
831 full_where='locked = 0 AND invalid = 0 AND ' + where
832 for host in Host.fetch(where=full_where):
833 if self.host_has_agent(host):
834 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000835 continue
showard170873e2009-01-07 00:22:26 +0000836 if print_message:
showardb18134f2009-03-20 20:52:18 +0000837 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000838 tasks = host.reverify_tasks()
839 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000840
841
jadmanski0afbb632008-06-06 21:10:57 +0000842 def _recover_hosts(self):
843 # recover "Repair Failed" hosts
844 message = 'Reverifying dead host %s'
845 self._reverify_hosts_where("status = 'Repair Failed'",
846 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000847
848
showard04c82c52008-05-29 19:38:12 +0000849
showardb95b1bd2008-08-15 18:11:04 +0000850 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000851 # prioritize by job priority, then non-metahost over metahost, then FIFO
852 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000853 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000854 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000855 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000856
857
showard89f84db2009-03-12 20:39:13 +0000858 def _refresh_pending_queue_entries(self):
859 """
860 Lookup the pending HostQueueEntries and call our HostScheduler
861 refresh() method given that list. Return the list.
862
863 @returns A list of pending HostQueueEntries sorted in priority order.
864 """
showard63a34772008-08-18 19:32:50 +0000865 queue_entries = self._get_pending_queue_entries()
866 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000867 return []
showardb95b1bd2008-08-15 18:11:04 +0000868
showard63a34772008-08-18 19:32:50 +0000869 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000870
showard89f84db2009-03-12 20:39:13 +0000871 return queue_entries
872
873
874 def _schedule_atomic_group(self, queue_entry):
875 """
876 Schedule the given queue_entry on an atomic group of hosts.
877
878 Returns immediately if there are insufficient available hosts.
879
880 Creates new HostQueueEntries based off of queue_entry for the
881 scheduled hosts and starts them all running.
882 """
883 # This is a virtual host queue entry representing an entire
884 # atomic group, find a group and schedule their hosts.
885 group_hosts = self._host_scheduler.find_eligible_atomic_group(
886 queue_entry)
887 if not group_hosts:
888 return
889 # The first assigned host uses the original HostQueueEntry
890 group_queue_entries = [queue_entry]
891 for assigned_host in group_hosts[1:]:
892 # Create a new HQE for every additional assigned_host.
893 new_hqe = HostQueueEntry.clone(queue_entry)
894 new_hqe.save()
895 group_queue_entries.append(new_hqe)
896 assert len(group_queue_entries) == len(group_hosts)
897 for queue_entry, host in itertools.izip(group_queue_entries,
898 group_hosts):
899 self._run_queue_entry(queue_entry, host)
900
901
902 def _schedule_new_jobs(self):
903 queue_entries = self._refresh_pending_queue_entries()
904 if not queue_entries:
905 return
906
showard63a34772008-08-18 19:32:50 +0000907 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000908 if (queue_entry.atomic_group_id is None or
909 queue_entry.host_id is not None):
910 assigned_host = self._host_scheduler.find_eligible_host(
911 queue_entry)
912 if assigned_host:
913 self._run_queue_entry(queue_entry, assigned_host)
914 else:
915 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000916
917
918 def _run_queue_entry(self, queue_entry, host):
919 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000920 # in some cases (synchronous jobs with run_verify=False), agent may be
921 # None
showard9976ce92008-10-15 20:28:13 +0000922 if agent:
923 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000924
925
jadmanski0afbb632008-06-06 21:10:57 +0000926 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000927 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
928 for agent in self.get_agents_for_entry(entry):
929 agent.abort()
930 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000931
932
showard324bf812009-01-20 23:23:38 +0000933 def _can_start_agent(self, agent, num_started_this_cycle,
934 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000935 # always allow zero-process agents to run
936 if agent.num_processes == 0:
937 return True
938 # don't allow any nonzero-process agents to run after we've reached a
939 # limit (this avoids starvation of many-process agents)
940 if have_reached_limit:
941 return False
942 # total process throttling
showard324bf812009-01-20 23:23:38 +0000943 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000944 return False
945 # if a single agent exceeds the per-cycle throttling, still allow it to
946 # run when it's the first agent in the cycle
947 if num_started_this_cycle == 0:
948 return True
949 # per-cycle throttling
950 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000951 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000952 return False
953 return True
954
955
jadmanski0afbb632008-06-06 21:10:57 +0000956 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000957 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000958 have_reached_limit = False
959 # iterate over copy, so we can remove agents during iteration
960 for agent in list(self._agents):
961 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000962 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000963 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000964 continue
965 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000966 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000967 have_reached_limit):
968 have_reached_limit = True
969 continue
showard4c5374f2008-09-04 17:02:56 +0000970 num_started_this_cycle += agent.num_processes
971 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000972 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000973 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000974
975
showard29f7cd22009-04-29 21:16:24 +0000976 def _process_recurring_runs(self):
977 recurring_runs = models.RecurringRun.objects.filter(
978 start_date__lte=datetime.datetime.now())
979 for rrun in recurring_runs:
980 # Create job from template
981 job = rrun.job
982 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000983 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000984
985 host_objects = info['hosts']
986 one_time_hosts = info['one_time_hosts']
987 metahost_objects = info['meta_hosts']
988 dependencies = info['dependencies']
989 atomic_group = info['atomic_group']
990
991 for host in one_time_hosts or []:
992 this_host = models.Host.create_one_time_host(host.hostname)
993 host_objects.append(this_host)
994
995 try:
996 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000997 options=options,
showard29f7cd22009-04-29 21:16:24 +0000998 host_objects=host_objects,
999 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001000 atomic_group=atomic_group)
1001
1002 except Exception, ex:
1003 logging.exception(ex)
1004 #TODO send email
1005
1006 if rrun.loop_count == 1:
1007 rrun.delete()
1008 else:
1009 if rrun.loop_count != 0: # if not infinite loop
1010 # calculate new start_date
1011 difference = datetime.timedelta(seconds=rrun.loop_period)
1012 rrun.start_date = rrun.start_date + difference
1013 rrun.loop_count -= 1
1014 rrun.save()
1015
1016
showard170873e2009-01-07 00:22:26 +00001017class PidfileRunMonitor(object):
1018 """
1019 Client must call either run() to start a new process or
1020 attach_to_existing_process().
1021 """
mbligh36768f02008-02-22 18:28:33 +00001022
showard170873e2009-01-07 00:22:26 +00001023 class _PidfileException(Exception):
1024 """
1025 Raised when there's some unexpected behavior with the pid file, but only
1026 used internally (never allowed to escape this class).
1027 """
mbligh36768f02008-02-22 18:28:33 +00001028
1029
showard170873e2009-01-07 00:22:26 +00001030 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001031 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001032 self._start_time = None
1033 self.pidfile_id = None
1034 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001035
1036
showard170873e2009-01-07 00:22:26 +00001037 def _add_nice_command(self, command, nice_level):
1038 if not nice_level:
1039 return command
1040 return ['nice', '-n', str(nice_level)] + command
1041
1042
1043 def _set_start_time(self):
1044 self._start_time = time.time()
1045
1046
1047 def run(self, command, working_directory, nice_level=None, log_file=None,
1048 pidfile_name=None, paired_with_pidfile=None):
1049 assert command is not None
1050 if nice_level is not None:
1051 command = ['nice', '-n', str(nice_level)] + command
1052 self._set_start_time()
1053 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001054 command, working_directory, pidfile_name=pidfile_name,
1055 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001056
1057
showardd3dc1992009-04-22 21:01:40 +00001058 def attach_to_existing_process(self, execution_tag,
1059 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001060 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001061 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1062 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001063 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001064
1065
jadmanski0afbb632008-06-06 21:10:57 +00001066 def kill(self):
showard170873e2009-01-07 00:22:26 +00001067 if self.has_process():
1068 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001069
mbligh36768f02008-02-22 18:28:33 +00001070
showard170873e2009-01-07 00:22:26 +00001071 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001072 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001073 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001074
1075
showard170873e2009-01-07 00:22:26 +00001076 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001077 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001078 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001079 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001080
1081
showard170873e2009-01-07 00:22:26 +00001082 def _read_pidfile(self, use_second_read=False):
1083 assert self.pidfile_id is not None, (
1084 'You must call run() or attach_to_existing_process()')
1085 contents = _drone_manager.get_pidfile_contents(
1086 self.pidfile_id, use_second_read=use_second_read)
1087 if contents.is_invalid():
1088 self._state = drone_manager.PidfileContents()
1089 raise self._PidfileException(contents)
1090 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001091
1092
showard21baa452008-10-21 00:08:39 +00001093 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001094 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1095 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001096 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001097 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001098 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001099
1100
1101 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001102 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001103 return
mblighbb421852008-03-11 22:36:16 +00001104
showard21baa452008-10-21 00:08:39 +00001105 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001106
showard170873e2009-01-07 00:22:26 +00001107 if self._state.process is None:
1108 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001109 return
mbligh90a549d2008-03-25 23:52:34 +00001110
showard21baa452008-10-21 00:08:39 +00001111 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001112 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001113 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001114 return
mbligh90a549d2008-03-25 23:52:34 +00001115
showard170873e2009-01-07 00:22:26 +00001116 # pid but no running process - maybe process *just* exited
1117 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001118 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001119 # autoserv exited without writing an exit code
1120 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001121 self._handle_pidfile_error(
1122 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001123
showard21baa452008-10-21 00:08:39 +00001124
1125 def _get_pidfile_info(self):
1126 """\
1127 After completion, self._state will contain:
1128 pid=None, exit_status=None if autoserv has not yet run
1129 pid!=None, exit_status=None if autoserv is running
1130 pid!=None, exit_status!=None if autoserv has completed
1131 """
1132 try:
1133 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001134 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001135 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001136
1137
showard170873e2009-01-07 00:22:26 +00001138 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001139 """\
1140 Called when no pidfile is found or no pid is in the pidfile.
1141 """
showard170873e2009-01-07 00:22:26 +00001142 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001143 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001144 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1145 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001146 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001147 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001148
1149
showard35162b02009-03-03 02:17:30 +00001150 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001151 """\
1152 Called when autoserv has exited without writing an exit status,
1153 or we've timed out waiting for autoserv to write a pid to the
1154 pidfile. In either case, we just return failure and the caller
1155 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001156
showard170873e2009-01-07 00:22:26 +00001157 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001158 """
1159 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001160 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001161 self._state.exit_status = 1
1162 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001163
1164
jadmanski0afbb632008-06-06 21:10:57 +00001165 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001166 self._get_pidfile_info()
1167 return self._state.exit_status
1168
1169
1170 def num_tests_failed(self):
1171 self._get_pidfile_info()
1172 assert self._state.num_tests_failed is not None
1173 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001174
1175
mbligh36768f02008-02-22 18:28:33 +00001176class Agent(object):
showard170873e2009-01-07 00:22:26 +00001177 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001178 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001179 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001181 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001182
showard170873e2009-01-07 00:22:26 +00001183 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1184 for task in tasks)
1185 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1186
showardd3dc1992009-04-22 21:01:40 +00001187 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001188 for task in tasks:
1189 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001190
1191
showardd3dc1992009-04-22 21:01:40 +00001192 def _clear_queue(self):
1193 self.queue = Queue.Queue(0)
1194
1195
showard170873e2009-01-07 00:22:26 +00001196 def _union_ids(self, id_lists):
1197 return set(itertools.chain(*id_lists))
1198
1199
jadmanski0afbb632008-06-06 21:10:57 +00001200 def add_task(self, task):
1201 self.queue.put_nowait(task)
1202 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001203
1204
jadmanski0afbb632008-06-06 21:10:57 +00001205 def tick(self):
showard21baa452008-10-21 00:08:39 +00001206 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001207 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001208 self.active_task.poll()
1209 if not self.active_task.is_done():
1210 return
1211 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001212
1213
jadmanski0afbb632008-06-06 21:10:57 +00001214 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001215 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001216 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001217 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001218 if not self.active_task.success:
1219 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001220 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001221
jadmanski0afbb632008-06-06 21:10:57 +00001222 if not self.is_done():
1223 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001224
1225
jadmanski0afbb632008-06-06 21:10:57 +00001226 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001227 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001228 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1229 # get reset.
1230 new_agent = Agent(self.active_task.failure_tasks)
1231 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001232
mblighe2586682008-02-29 22:45:46 +00001233
showard4c5374f2008-09-04 17:02:56 +00001234 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001235 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001239 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001240
1241
showardd3dc1992009-04-22 21:01:40 +00001242 def abort(self):
showard08a36412009-05-05 01:01:13 +00001243 # abort tasks until the queue is empty or a task ignores the abort
1244 while not self.is_done():
1245 if not self.active_task:
1246 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001247 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001248 if not self.active_task.aborted:
1249 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001250 return
1251 self.active_task = None
1252
showardd3dc1992009-04-22 21:01:40 +00001253
mbligh36768f02008-02-22 18:28:33 +00001254class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001255 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1256 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001257 self.done = False
1258 self.failure_tasks = failure_tasks
1259 self.started = False
1260 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001261 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001262 self.task = None
1263 self.agent = None
1264 self.monitor = None
1265 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001266 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001267 self.queue_entry_ids = []
1268 self.host_ids = []
1269 self.log_file = None
1270
1271
1272 def _set_ids(self, host=None, queue_entries=None):
1273 if queue_entries and queue_entries != [None]:
1274 self.host_ids = [entry.host.id for entry in queue_entries]
1275 self.queue_entry_ids = [entry.id for entry in queue_entries]
1276 else:
1277 assert host
1278 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001279
1280
jadmanski0afbb632008-06-06 21:10:57 +00001281 def poll(self):
showard08a36412009-05-05 01:01:13 +00001282 if not self.started:
1283 self.start()
1284 self.tick()
1285
1286
1287 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001288 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001289 exit_code = self.monitor.exit_code()
1290 if exit_code is None:
1291 return
1292 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001293 else:
1294 success = False
mbligh36768f02008-02-22 18:28:33 +00001295
jadmanski0afbb632008-06-06 21:10:57 +00001296 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001297
1298
jadmanski0afbb632008-06-06 21:10:57 +00001299 def is_done(self):
1300 return self.done
mbligh36768f02008-02-22 18:28:33 +00001301
1302
jadmanski0afbb632008-06-06 21:10:57 +00001303 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001304 if self.done:
1305 return
jadmanski0afbb632008-06-06 21:10:57 +00001306 self.done = True
1307 self.success = success
1308 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001309
1310
jadmanski0afbb632008-06-06 21:10:57 +00001311 def prolog(self):
1312 pass
mblighd64e5702008-04-04 21:39:28 +00001313
1314
jadmanski0afbb632008-06-06 21:10:57 +00001315 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001316 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001317
mbligh36768f02008-02-22 18:28:33 +00001318
jadmanski0afbb632008-06-06 21:10:57 +00001319 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001320 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001321 _drone_manager.copy_to_results_repository(
1322 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001323
1324
jadmanski0afbb632008-06-06 21:10:57 +00001325 def epilog(self):
1326 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001327
1328
jadmanski0afbb632008-06-06 21:10:57 +00001329 def start(self):
1330 assert self.agent
1331
1332 if not self.started:
1333 self.prolog()
1334 self.run()
1335
1336 self.started = True
1337
1338
1339 def abort(self):
1340 if self.monitor:
1341 self.monitor.kill()
1342 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001343 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001344 self.cleanup()
1345
1346
showard170873e2009-01-07 00:22:26 +00001347 def set_host_log_file(self, base_name, host):
1348 filename = '%s.%s' % (time.time(), base_name)
1349 self.log_file = os.path.join('hosts', host.hostname, filename)
1350
1351
showardde634ee2009-01-30 01:44:24 +00001352 def _get_consistent_execution_tag(self, queue_entries):
1353 first_execution_tag = queue_entries[0].execution_tag()
1354 for queue_entry in queue_entries[1:]:
1355 assert queue_entry.execution_tag() == first_execution_tag, (
1356 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1357 queue_entry,
1358 first_execution_tag,
1359 queue_entries[0]))
1360 return first_execution_tag
1361
1362
showarda1e74b32009-05-12 17:32:04 +00001363 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001364 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001365 if use_monitor is None:
1366 assert self.monitor
1367 use_monitor = self.monitor
1368 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001369 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001370 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001371 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001372 results_path)
showardde634ee2009-01-30 01:44:24 +00001373
showarda1e74b32009-05-12 17:32:04 +00001374
1375 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001376 reparse_task = FinalReparseTask(queue_entries)
1377 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1378
1379
showarda1e74b32009-05-12 17:32:04 +00001380 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1381 self._copy_results(queue_entries, use_monitor)
1382 self._parse_results(queue_entries)
1383
1384
showardd3dc1992009-04-22 21:01:40 +00001385 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001386 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001387 self.monitor = PidfileRunMonitor()
1388 self.monitor.run(self.cmd, self._working_directory,
1389 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001390 log_file=self.log_file,
1391 pidfile_name=pidfile_name,
1392 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001393
1394
showardd9205182009-04-27 20:09:55 +00001395class TaskWithJobKeyvals(object):
1396 """AgentTask mixin providing functionality to help with job keyval files."""
1397 _KEYVAL_FILE = 'keyval'
1398 def _format_keyval(self, key, value):
1399 return '%s=%s' % (key, value)
1400
1401
1402 def _keyval_path(self):
1403 """Subclasses must override this"""
1404 raise NotImplemented
1405
1406
1407 def _write_keyval_after_job(self, field, value):
1408 assert self.monitor
1409 if not self.monitor.has_process():
1410 return
1411 _drone_manager.write_lines_to_file(
1412 self._keyval_path(), [self._format_keyval(field, value)],
1413 paired_with_process=self.monitor.get_process())
1414
1415
1416 def _job_queued_keyval(self, job):
1417 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1418
1419
1420 def _write_job_finished(self):
1421 self._write_keyval_after_job("job_finished", int(time.time()))
1422
1423
1424class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001425 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001426 """\
showard170873e2009-01-07 00:22:26 +00001427 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001428 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001429 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001430 # normalize the protection name
1431 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001432
jadmanski0afbb632008-06-06 21:10:57 +00001433 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001434 self.queue_entry_to_fail = queue_entry
1435 # *don't* include the queue entry in IDs -- if the queue entry is
1436 # aborted, we want to leave the repair task running
1437 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001438
1439 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001440 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1441 ['-R', '--host-protection', protection],
1442 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001443 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1444
showard170873e2009-01-07 00:22:26 +00001445 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001446
mbligh36768f02008-02-22 18:28:33 +00001447
jadmanski0afbb632008-06-06 21:10:57 +00001448 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001449 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001450 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001451 if self.queue_entry_to_fail:
1452 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001453
1454
showardd9205182009-04-27 20:09:55 +00001455 def _keyval_path(self):
1456 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1457
1458
showardde634ee2009-01-30 01:44:24 +00001459 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001460 assert self.queue_entry_to_fail
1461
1462 if self.queue_entry_to_fail.meta_host:
1463 return # don't fail metahost entries, they'll be reassigned
1464
1465 self.queue_entry_to_fail.update_from_database()
1466 if self.queue_entry_to_fail.status != 'Queued':
1467 return # entry has been aborted
1468
1469 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001470 queued_key, queued_time = self._job_queued_keyval(
1471 self.queue_entry_to_fail.job)
1472 self._write_keyval_after_job(queued_key, queued_time)
1473 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001474 # copy results logs into the normal place for job results
1475 _drone_manager.copy_results_on_drone(
1476 self.monitor.get_process(),
1477 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001478 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001479
showarda1e74b32009-05-12 17:32:04 +00001480 self._copy_results([self.queue_entry_to_fail])
1481 if self.queue_entry_to_fail.job.parse_failed_repair:
1482 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001483 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001484
1485
jadmanski0afbb632008-06-06 21:10:57 +00001486 def epilog(self):
1487 super(RepairTask, self).epilog()
1488 if self.success:
1489 self.host.set_status('Ready')
1490 else:
1491 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001492 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001493 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001494
1495
showard8fe93b52008-11-18 17:53:22 +00001496class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001497 def epilog(self):
1498 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001499 should_copy_results = (self.queue_entry and not self.success
1500 and not self.queue_entry.meta_host)
1501 if should_copy_results:
1502 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001503 destination = os.path.join(self.queue_entry.execution_tag(),
1504 os.path.basename(self.log_file))
1505 _drone_manager.copy_to_results_repository(
1506 self.monitor.get_process(), self.log_file,
1507 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001508
1509
1510class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001511 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001512 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001513 self.host = host or queue_entry.host
1514 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001515
jadmanski0afbb632008-06-06 21:10:57 +00001516 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001517 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1518 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001519 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001520 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1521 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001522
showard170873e2009-01-07 00:22:26 +00001523 self.set_host_log_file('verify', self.host)
1524 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001528 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001529 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001530 if self.queue_entry:
1531 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001532 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001533
1534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def epilog(self):
1536 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001537
jadmanski0afbb632008-06-06 21:10:57 +00001538 if self.success:
1539 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001540
1541
showardd9205182009-04-27 20:09:55 +00001542class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001543 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001544 self.job = job
1545 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001546 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001547 super(QueueTask, self).__init__(cmd, self._execution_tag())
1548 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001549
1550
showard73ec0442009-02-07 02:05:20 +00001551 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001552 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001553
1554
1555 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1556 keyval_contents = '\n'.join(self._format_keyval(key, value)
1557 for key, value in keyval_dict.iteritems())
1558 # always end with a newline to allow additional keyvals to be written
1559 keyval_contents += '\n'
1560 _drone_manager.attach_file_to_execution(self._execution_tag(),
1561 keyval_contents,
1562 file_path=keyval_path)
1563
1564
1565 def _write_keyvals_before_job(self, keyval_dict):
1566 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1567
1568
showard170873e2009-01-07 00:22:26 +00001569 def _write_host_keyvals(self, host):
1570 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1571 host.hostname)
1572 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001573 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1574 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001575
1576
showard170873e2009-01-07 00:22:26 +00001577 def _execution_tag(self):
1578 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001579
1580
jadmanski0afbb632008-06-06 21:10:57 +00001581 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001582 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001583 keyval_dict = {queued_key: queued_time}
1584 if self.group_name:
1585 keyval_dict['host_group_name'] = self.group_name
1586 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001587 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001588 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001589 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001590 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001591 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001592 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001593 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001594 assert len(self.queue_entries) == 1
1595 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001596
1597
showard35162b02009-03-03 02:17:30 +00001598 def _write_lost_process_error_file(self):
1599 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1600 _drone_manager.write_lines_to_file(error_file_path,
1601 [_LOST_PROCESS_ERROR])
1602
1603
showardd3dc1992009-04-22 21:01:40 +00001604 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001605 if not self.monitor:
1606 return
1607
showardd9205182009-04-27 20:09:55 +00001608 self._write_job_finished()
1609
showardd3dc1992009-04-22 21:01:40 +00001610 # both of these conditionals can be true, iff the process ran, wrote a
1611 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001612 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001613 gather_task = GatherLogsTask(self.job, self.queue_entries)
1614 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001615
1616 if self.monitor.lost_process:
1617 self._write_lost_process_error_file()
1618 for queue_entry in self.queue_entries:
1619 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001620
1621
showardcbd74612008-11-19 21:42:02 +00001622 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001623 _drone_manager.write_lines_to_file(
1624 os.path.join(self._execution_tag(), 'status.log'),
1625 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001626 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001627
1628
jadmanskif7fa2cc2008-10-01 14:13:23 +00001629 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001630 if not self.monitor or not self.monitor.has_process():
1631 return
1632
jadmanskif7fa2cc2008-10-01 14:13:23 +00001633 # build up sets of all the aborted_by and aborted_on values
1634 aborted_by, aborted_on = set(), set()
1635 for queue_entry in self.queue_entries:
1636 if queue_entry.aborted_by:
1637 aborted_by.add(queue_entry.aborted_by)
1638 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1639 aborted_on.add(t)
1640
1641 # extract some actual, unique aborted by value and write it out
1642 assert len(aborted_by) <= 1
1643 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001644 aborted_by_value = aborted_by.pop()
1645 aborted_on_value = max(aborted_on)
1646 else:
1647 aborted_by_value = 'autotest_system'
1648 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001649
showarda0382352009-02-11 23:36:43 +00001650 self._write_keyval_after_job("aborted_by", aborted_by_value)
1651 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001652
showardcbd74612008-11-19 21:42:02 +00001653 aborted_on_string = str(datetime.datetime.fromtimestamp(
1654 aborted_on_value))
1655 self._write_status_comment('Job aborted by %s on %s' %
1656 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001657
1658
jadmanski0afbb632008-06-06 21:10:57 +00001659 def abort(self):
1660 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001661 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001662 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001663
1664
jadmanski0afbb632008-06-06 21:10:57 +00001665 def epilog(self):
1666 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001667 self._finish_task()
1668 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001669
1670
mblighbb421852008-03-11 22:36:16 +00001671class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001672 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001673 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001674 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001675
1676
jadmanski0afbb632008-06-06 21:10:57 +00001677 def run(self):
1678 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001679
1680
jadmanski0afbb632008-06-06 21:10:57 +00001681 def prolog(self):
1682 # recovering an existing process - don't do prolog
1683 pass
mblighbb421852008-03-11 22:36:16 +00001684
1685
showardd3dc1992009-04-22 21:01:40 +00001686class PostJobTask(AgentTask):
1687 def __init__(self, queue_entries, pidfile_name, logfile_name,
1688 run_monitor=None):
1689 """
1690 If run_monitor != None, we're recovering a running task.
1691 """
1692 self._queue_entries = queue_entries
1693 self._pidfile_name = pidfile_name
1694 self._run_monitor = run_monitor
1695
1696 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1697 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1698 self._autoserv_monitor = PidfileRunMonitor()
1699 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1700 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1701
1702 if _testing_mode:
1703 command = 'true'
1704 else:
1705 command = self._generate_command(self._results_dir)
1706
1707 super(PostJobTask, self).__init__(cmd=command,
1708 working_directory=self._execution_tag)
1709
1710 self.log_file = os.path.join(self._execution_tag, logfile_name)
1711 self._final_status = self._determine_final_status()
1712
1713
1714 def _generate_command(self, results_dir):
1715 raise NotImplementedError('Subclasses must override this')
1716
1717
1718 def _job_was_aborted(self):
1719 was_aborted = None
1720 for queue_entry in self._queue_entries:
1721 queue_entry.update_from_database()
1722 if was_aborted is None: # first queue entry
1723 was_aborted = bool(queue_entry.aborted)
1724 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1725 email_manager.manager.enqueue_notify_email(
1726 'Inconsistent abort state',
1727 'Queue entries have inconsistent abort state: ' +
1728 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1729 # don't crash here, just assume true
1730 return True
1731 return was_aborted
1732
1733
1734 def _determine_final_status(self):
1735 if self._job_was_aborted():
1736 return models.HostQueueEntry.Status.ABORTED
1737
1738 # we'll use a PidfileRunMonitor to read the autoserv exit status
1739 if self._autoserv_monitor.exit_code() == 0:
1740 return models.HostQueueEntry.Status.COMPLETED
1741 return models.HostQueueEntry.Status.FAILED
1742
1743
1744 def run(self):
1745 if self._run_monitor is not None:
1746 self.monitor = self._run_monitor
1747 else:
1748 # make sure we actually have results to work with.
1749 # this should never happen in normal operation.
1750 if not self._autoserv_monitor.has_process():
1751 email_manager.manager.enqueue_notify_email(
1752 'No results in post-job task',
1753 'No results in post-job task at %s' %
1754 self._autoserv_monitor.pidfile_id)
1755 self.finished(False)
1756 return
1757
1758 super(PostJobTask, self).run(
1759 pidfile_name=self._pidfile_name,
1760 paired_with_pidfile=self._paired_with_pidfile)
1761
1762
1763 def _set_all_statuses(self, status):
1764 for queue_entry in self._queue_entries:
1765 queue_entry.set_status(status)
1766
1767
1768 def abort(self):
1769 # override AgentTask.abort() to avoid killing the process and ending
1770 # the task. post-job tasks continue when the job is aborted.
1771 pass
1772
1773
1774class GatherLogsTask(PostJobTask):
1775 """
1776 Task responsible for
1777 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1778 * copying logs to the results repository
1779 * spawning CleanupTasks for hosts, if necessary
1780 * spawning a FinalReparseTask for the job
1781 """
1782 def __init__(self, job, queue_entries, run_monitor=None):
1783 self._job = job
1784 super(GatherLogsTask, self).__init__(
1785 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1786 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1787 self._set_ids(queue_entries=queue_entries)
1788
1789
1790 def _generate_command(self, results_dir):
1791 host_list = ','.join(queue_entry.host.hostname
1792 for queue_entry in self._queue_entries)
1793 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1794 '-r', results_dir]
1795
1796
1797 def prolog(self):
1798 super(GatherLogsTask, self).prolog()
1799 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1800
1801
1802 def _reboot_hosts(self):
1803 reboot_after = self._job.reboot_after
1804 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001805 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1806 do_reboot = True
1807 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001808 do_reboot = True
1809 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1810 final_success = (
1811 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1812 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1813 do_reboot = (final_success and num_tests_failed == 0)
1814
1815 for queue_entry in self._queue_entries:
1816 if do_reboot:
1817 # don't pass the queue entry to the CleanupTask. if the cleanup
1818 # fails, the job doesn't care -- it's over.
1819 cleanup_task = CleanupTask(host=queue_entry.host)
1820 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1821 else:
1822 queue_entry.host.set_status('Ready')
1823
1824
1825 def epilog(self):
1826 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001827 if self._autoserv_monitor.has_process():
1828 self._copy_and_parse_results(self._queue_entries,
1829 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001830 self._reboot_hosts()
1831
1832
showard0bbfc212009-04-29 21:06:13 +00001833 def run(self):
showard597bfd32009-05-08 18:22:50 +00001834 autoserv_exit_code = self._autoserv_monitor.exit_code()
1835 # only run if Autoserv exited due to some signal. if we have no exit
1836 # code, assume something bad (and signal-like) happened.
1837 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001838 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001839 else:
1840 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001841
1842
showard8fe93b52008-11-18 17:53:22 +00001843class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001844 def __init__(self, host=None, queue_entry=None):
1845 assert bool(host) ^ bool(queue_entry)
1846 if queue_entry:
1847 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001848 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001849 self.host = host
showard170873e2009-01-07 00:22:26 +00001850
1851 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001852 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1853 ['--cleanup'],
1854 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001855 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001856 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1857 failure_tasks=[repair_task])
1858
1859 self._set_ids(host=host, queue_entries=[queue_entry])
1860 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001861
mblighd5c95802008-03-05 00:33:46 +00001862
jadmanski0afbb632008-06-06 21:10:57 +00001863 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001864 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001865 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001866 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001867
mblighd5c95802008-03-05 00:33:46 +00001868
showard21baa452008-10-21 00:08:39 +00001869 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001870 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001871 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001872 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001873 self.host.update_field('dirty', 0)
1874
1875
showardd3dc1992009-04-22 21:01:40 +00001876class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001877 _num_running_parses = 0
1878
showardd3dc1992009-04-22 21:01:40 +00001879 def __init__(self, queue_entries, run_monitor=None):
1880 super(FinalReparseTask, self).__init__(queue_entries,
1881 pidfile_name=_PARSER_PID_FILE,
1882 logfile_name='.parse.log',
1883 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001884 # don't use _set_ids, since we don't want to set the host_ids
1885 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001886 self._parse_started = False
1887
showard97aed502008-11-04 02:01:24 +00001888
1889 @classmethod
1890 def _increment_running_parses(cls):
1891 cls._num_running_parses += 1
1892
1893
1894 @classmethod
1895 def _decrement_running_parses(cls):
1896 cls._num_running_parses -= 1
1897
1898
1899 @classmethod
1900 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001901 return (cls._num_running_parses <
1902 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001903
1904
1905 def prolog(self):
1906 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001907 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001908
1909
1910 def epilog(self):
1911 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001912 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001913
1914
showardd3dc1992009-04-22 21:01:40 +00001915 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001916 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001917 results_dir]
showard97aed502008-11-04 02:01:24 +00001918
1919
showard08a36412009-05-05 01:01:13 +00001920 def tick(self):
1921 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001922 # and we can, at which point we revert to default behavior
1923 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001924 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001925 else:
1926 self._try_starting_parse()
1927
1928
1929 def run(self):
1930 # override run() to not actually run unless we can
1931 self._try_starting_parse()
1932
1933
1934 def _try_starting_parse(self):
1935 if not self._can_run_new_parse():
1936 return
showard170873e2009-01-07 00:22:26 +00001937
showard97aed502008-11-04 02:01:24 +00001938 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001939 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001940
showard97aed502008-11-04 02:01:24 +00001941 self._increment_running_parses()
1942 self._parse_started = True
1943
1944
1945 def finished(self, success):
1946 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001947 if self._parse_started:
1948 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001949
1950
showardc9ae1782009-01-30 01:42:37 +00001951class SetEntryPendingTask(AgentTask):
1952 def __init__(self, queue_entry):
1953 super(SetEntryPendingTask, self).__init__(cmd='')
1954 self._queue_entry = queue_entry
1955 self._set_ids(queue_entries=[queue_entry])
1956
1957
1958 def run(self):
1959 agent = self._queue_entry.on_pending()
1960 if agent:
1961 self.agent.dispatcher.add_agent(agent)
1962 self.finished(True)
1963
1964
showarda3c58572009-03-12 20:36:59 +00001965class DBError(Exception):
1966 """Raised by the DBObject constructor when its select fails."""
1967
1968
mbligh36768f02008-02-22 18:28:33 +00001969class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001970 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001971
1972 # Subclasses MUST override these:
1973 _table_name = ''
1974 _fields = ()
1975
showarda3c58572009-03-12 20:36:59 +00001976 # A mapping from (type, id) to the instance of the object for that
1977 # particular id. This prevents us from creating new Job() and Host()
1978 # instances for every HostQueueEntry object that we instantiate as
1979 # multiple HQEs often share the same Job.
1980 _instances_by_type_and_id = weakref.WeakValueDictionary()
1981 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001982
showarda3c58572009-03-12 20:36:59 +00001983
1984 def __new__(cls, id=None, **kwargs):
1985 """
1986 Look to see if we already have an instance for this particular type
1987 and id. If so, use it instead of creating a duplicate instance.
1988 """
1989 if id is not None:
1990 instance = cls._instances_by_type_and_id.get((cls, id))
1991 if instance:
1992 return instance
1993 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1994
1995
1996 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001997 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001998 assert self._table_name, '_table_name must be defined in your class'
1999 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002000 if not new_record:
2001 if self._initialized and not always_query:
2002 return # We've already been initialized.
2003 if id is None:
2004 id = row[0]
2005 # Tell future constructors to use us instead of re-querying while
2006 # this instance is still around.
2007 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002008
showard6ae5ea92009-02-25 00:11:51 +00002009 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002010
jadmanski0afbb632008-06-06 21:10:57 +00002011 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002012
jadmanski0afbb632008-06-06 21:10:57 +00002013 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002014 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002015
showarda3c58572009-03-12 20:36:59 +00002016 if self._initialized:
2017 differences = self._compare_fields_in_row(row)
2018 if differences:
showard7629f142009-03-27 21:02:02 +00002019 logging.warn(
2020 'initialized %s %s instance requery is updating: %s',
2021 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002022 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002023 self._initialized = True
2024
2025
2026 @classmethod
2027 def _clear_instance_cache(cls):
2028 """Used for testing, clear the internal instance cache."""
2029 cls._instances_by_type_and_id.clear()
2030
2031
showardccbd6c52009-03-21 00:10:21 +00002032 def _fetch_row_from_db(self, row_id):
2033 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2034 rows = _db.execute(sql, (row_id,))
2035 if not rows:
showard76e29d12009-04-15 21:53:10 +00002036 raise DBError("row not found (table=%s, row id=%s)"
2037 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002038 return rows[0]
2039
2040
showarda3c58572009-03-12 20:36:59 +00002041 def _assert_row_length(self, row):
2042 assert len(row) == len(self._fields), (
2043 "table = %s, row = %s/%d, fields = %s/%d" % (
2044 self.__table, row, len(row), self._fields, len(self._fields)))
2045
2046
2047 def _compare_fields_in_row(self, row):
2048 """
2049 Given a row as returned by a SELECT query, compare it to our existing
2050 in memory fields.
2051
2052 @param row - A sequence of values corresponding to fields named in
2053 The class attribute _fields.
2054
2055 @returns A dictionary listing the differences keyed by field name
2056 containing tuples of (current_value, row_value).
2057 """
2058 self._assert_row_length(row)
2059 differences = {}
2060 for field, row_value in itertools.izip(self._fields, row):
2061 current_value = getattr(self, field)
2062 if current_value != row_value:
2063 differences[field] = (current_value, row_value)
2064 return differences
showard2bab8f42008-11-12 18:15:22 +00002065
2066
2067 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002068 """
2069 Update our field attributes using a single row returned by SELECT.
2070
2071 @param row - A sequence of values corresponding to fields named in
2072 the class fields list.
2073 """
2074 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002075
showard2bab8f42008-11-12 18:15:22 +00002076 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002077 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002078 setattr(self, field, value)
2079 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002080
showard2bab8f42008-11-12 18:15:22 +00002081 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002082
mblighe2586682008-02-29 22:45:46 +00002083
showardccbd6c52009-03-21 00:10:21 +00002084 def update_from_database(self):
2085 assert self.id is not None
2086 row = self._fetch_row_from_db(self.id)
2087 self._update_fields_from_row(row)
2088
2089
jadmanski0afbb632008-06-06 21:10:57 +00002090 def count(self, where, table = None):
2091 if not table:
2092 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002093
jadmanski0afbb632008-06-06 21:10:57 +00002094 rows = _db.execute("""
2095 SELECT count(*) FROM %s
2096 WHERE %s
2097 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002098
jadmanski0afbb632008-06-06 21:10:57 +00002099 assert len(rows) == 1
2100
2101 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002102
2103
showardd3dc1992009-04-22 21:01:40 +00002104 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002105 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002106
showard2bab8f42008-11-12 18:15:22 +00002107 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002108 return
mbligh36768f02008-02-22 18:28:33 +00002109
mblighf8c624d2008-07-03 16:58:45 +00002110 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002111 _db.execute(query, (value, self.id))
2112
showard2bab8f42008-11-12 18:15:22 +00002113 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002114
2115
jadmanski0afbb632008-06-06 21:10:57 +00002116 def save(self):
2117 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002118 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002119 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002120 values = []
2121 for key in keys:
2122 value = getattr(self, key)
2123 if value is None:
2124 values.append('NULL')
2125 else:
2126 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002127 values_str = ','.join(values)
2128 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2129 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002130 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002131 # Update our id to the one the database just assigned to us.
2132 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002133
2134
jadmanski0afbb632008-06-06 21:10:57 +00002135 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002136 self._instances_by_type_and_id.pop((type(self), id), None)
2137 self._initialized = False
2138 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002139 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2140 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002141
2142
showard63a34772008-08-18 19:32:50 +00002143 @staticmethod
2144 def _prefix_with(string, prefix):
2145 if string:
2146 string = prefix + string
2147 return string
2148
2149
jadmanski0afbb632008-06-06 21:10:57 +00002150 @classmethod
showard989f25d2008-10-01 11:38:11 +00002151 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002152 """
2153 Construct instances of our class based on the given database query.
2154
2155 @yields One class instance for each row fetched.
2156 """
showard63a34772008-08-18 19:32:50 +00002157 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2158 where = cls._prefix_with(where, 'WHERE ')
2159 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002160 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002161 'joins' : joins,
2162 'where' : where,
2163 'order_by' : order_by})
2164 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002165 for row in rows:
2166 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002167
mbligh36768f02008-02-22 18:28:33 +00002168
2169class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002170 _table_name = 'ineligible_host_queues'
2171 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002172
2173
showard89f84db2009-03-12 20:39:13 +00002174class AtomicGroup(DBObject):
2175 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002176 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2177 'invalid')
showard89f84db2009-03-12 20:39:13 +00002178
2179
showard989f25d2008-10-01 11:38:11 +00002180class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002181 _table_name = 'labels'
2182 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002183 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002184
2185
mbligh36768f02008-02-22 18:28:33 +00002186class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002187 _table_name = 'hosts'
2188 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2189 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2190
2191
jadmanski0afbb632008-06-06 21:10:57 +00002192 def current_task(self):
2193 rows = _db.execute("""
2194 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2195 """, (self.id,))
2196
2197 if len(rows) == 0:
2198 return None
2199 else:
2200 assert len(rows) == 1
2201 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002202 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002203
2204
jadmanski0afbb632008-06-06 21:10:57 +00002205 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002206 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002207 if self.current_task():
2208 self.current_task().requeue()
2209
showard6ae5ea92009-02-25 00:11:51 +00002210
jadmanski0afbb632008-06-06 21:10:57 +00002211 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002212 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002213 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002214
2215
showard170873e2009-01-07 00:22:26 +00002216 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002217 """
showard170873e2009-01-07 00:22:26 +00002218 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002219 """
2220 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002221 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002222 FROM labels
2223 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002224 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002225 ORDER BY labels.name
2226 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002227 platform = None
2228 all_labels = []
2229 for label_name, is_platform in rows:
2230 if is_platform:
2231 platform = label_name
2232 all_labels.append(label_name)
2233 return platform, all_labels
2234
2235
2236 def reverify_tasks(self):
2237 cleanup_task = CleanupTask(host=self)
2238 verify_task = VerifyTask(host=self)
2239 # just to make sure this host does not get taken away
2240 self.set_status('Cleaning')
2241 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002242
2243
showard54c1ea92009-05-20 00:32:58 +00002244 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2245
2246
2247 @classmethod
2248 def cmp_for_sort(cls, a, b):
2249 """
2250 A comparison function for sorting Host objects by hostname.
2251
2252 This strips any trailing numeric digits, ignores leading 0s and
2253 compares hostnames by the leading name and the trailing digits as a
2254 number. If both hostnames do not match this pattern, they are simply
2255 compared as lower case strings.
2256
2257 Example of how hostnames will be sorted:
2258
2259 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2260
2261 This hopefully satisfy most people's hostname sorting needs regardless
2262 of their exact naming schemes. Nobody sane should have both a host10
2263 and host010 (but the algorithm works regardless).
2264 """
2265 lower_a = a.hostname.lower()
2266 lower_b = b.hostname.lower()
2267 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2268 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2269 if match_a and match_b:
2270 name_a, number_a_str = match_a.groups()
2271 name_b, number_b_str = match_b.groups()
2272 number_a = int(number_a_str.lstrip('0'))
2273 number_b = int(number_b_str.lstrip('0'))
2274 result = cmp((name_a, number_a), (name_b, number_b))
2275 if result == 0 and lower_a != lower_b:
2276 # If they compared equal above but the lower case names are
2277 # indeed different, don't report equality. abc012 != abc12.
2278 return cmp(lower_a, lower_b)
2279 return result
2280 else:
2281 return cmp(lower_a, lower_b)
2282
2283
mbligh36768f02008-02-22 18:28:33 +00002284class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002285 _table_name = 'host_queue_entries'
2286 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002287 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002288 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002289
2290
showarda3c58572009-03-12 20:36:59 +00002291 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002292 assert id or row
showarda3c58572009-03-12 20:36:59 +00002293 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002294 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002295
jadmanski0afbb632008-06-06 21:10:57 +00002296 if self.host_id:
2297 self.host = Host(self.host_id)
2298 else:
2299 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002300
showard170873e2009-01-07 00:22:26 +00002301 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002302 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002303
2304
showard89f84db2009-03-12 20:39:13 +00002305 @classmethod
2306 def clone(cls, template):
2307 """
2308 Creates a new row using the values from a template instance.
2309
2310 The new instance will not exist in the database or have a valid
2311 id attribute until its save() method is called.
2312 """
2313 assert isinstance(template, cls)
2314 new_row = [getattr(template, field) for field in cls._fields]
2315 clone = cls(row=new_row, new_record=True)
2316 clone.id = None
2317 return clone
2318
2319
showardc85c21b2008-11-24 22:17:37 +00002320 def _view_job_url(self):
2321 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2322
2323
showardf1ae3542009-05-11 19:26:02 +00002324 def get_labels(self):
2325 """
2326 Get all labels associated with this host queue entry (either via the
2327 meta_host or as a job dependency label). The labels yielded are not
2328 guaranteed to be unique.
2329
2330 @yields Label instances associated with this host_queue_entry.
2331 """
2332 if self.meta_host:
2333 yield Label(id=self.meta_host, always_query=False)
2334 labels = Label.fetch(
2335 joins="JOIN jobs_dependency_labels AS deps "
2336 "ON (labels.id = deps.label_id)",
2337 where="deps.job_id = %d" % self.job.id)
2338 for label in labels:
2339 yield label
2340
2341
jadmanski0afbb632008-06-06 21:10:57 +00002342 def set_host(self, host):
2343 if host:
2344 self.queue_log_record('Assigning host ' + host.hostname)
2345 self.update_field('host_id', host.id)
2346 self.update_field('active', True)
2347 self.block_host(host.id)
2348 else:
2349 self.queue_log_record('Releasing host')
2350 self.unblock_host(self.host.id)
2351 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002352
jadmanski0afbb632008-06-06 21:10:57 +00002353 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002354
2355
jadmanski0afbb632008-06-06 21:10:57 +00002356 def get_host(self):
2357 return self.host
mbligh36768f02008-02-22 18:28:33 +00002358
2359
jadmanski0afbb632008-06-06 21:10:57 +00002360 def queue_log_record(self, log_line):
2361 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002362 _drone_manager.write_lines_to_file(self.queue_log_path,
2363 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002364
2365
jadmanski0afbb632008-06-06 21:10:57 +00002366 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002367 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002368 row = [0, self.job.id, host_id]
2369 block = IneligibleHostQueue(row=row, new_record=True)
2370 block.save()
mblighe2586682008-02-29 22:45:46 +00002371
2372
jadmanski0afbb632008-06-06 21:10:57 +00002373 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002374 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002375 blocks = IneligibleHostQueue.fetch(
2376 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2377 for block in blocks:
2378 block.delete()
mblighe2586682008-02-29 22:45:46 +00002379
2380
showard2bab8f42008-11-12 18:15:22 +00002381 def set_execution_subdir(self, subdir=None):
2382 if subdir is None:
2383 assert self.get_host()
2384 subdir = self.get_host().hostname
2385 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002386
2387
showard6355f6b2008-12-05 18:52:13 +00002388 def _get_hostname(self):
2389 if self.host:
2390 return self.host.hostname
2391 return 'no host'
2392
2393
showard170873e2009-01-07 00:22:26 +00002394 def __str__(self):
2395 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2396
2397
jadmanski0afbb632008-06-06 21:10:57 +00002398 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002399 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002400
showardb18134f2009-03-20 20:52:18 +00002401 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002402
showardc85c21b2008-11-24 22:17:37 +00002403 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002404 self.update_field('complete', False)
2405 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002406
jadmanski0afbb632008-06-06 21:10:57 +00002407 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002408 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002409 self.update_field('complete', False)
2410 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002411
showardc85c21b2008-11-24 22:17:37 +00002412 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002413 self.update_field('complete', True)
2414 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002415
2416 should_email_status = (status.lower() in _notify_email_statuses or
2417 'all' in _notify_email_statuses)
2418 if should_email_status:
2419 self._email_on_status(status)
2420
2421 self._email_on_job_complete()
2422
2423
2424 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002425 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002426
2427 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2428 self.job.id, self.job.name, hostname, status)
2429 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2430 self.job.id, self.job.name, hostname, status,
2431 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002432 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002433
2434
2435 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002436 if not self.job.is_finished():
2437 return
showard542e8402008-09-19 20:16:18 +00002438
showardc85c21b2008-11-24 22:17:37 +00002439 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002440 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002441 for queue_entry in hosts_queue:
2442 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002443 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002444 queue_entry.status))
2445
2446 summary_text = "\n".join(summary_text)
2447 status_counts = models.Job.objects.get_status_counts(
2448 [self.job.id])[self.job.id]
2449 status = ', '.join('%d %s' % (count, status) for status, count
2450 in status_counts.iteritems())
2451
2452 subject = 'Autotest: Job ID: %s "%s" %s' % (
2453 self.job.id, self.job.name, status)
2454 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2455 self.job.id, self.job.name, status, self._view_job_url(),
2456 summary_text)
showard170873e2009-01-07 00:22:26 +00002457 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002458
2459
showard89f84db2009-03-12 20:39:13 +00002460 def run(self, assigned_host=None):
2461 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002462 assert assigned_host
2463 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002464 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002465
showardb18134f2009-03-20 20:52:18 +00002466 logging.info("%s/%s/%s scheduled on %s, status=%s",
2467 self.job.name, self.meta_host, self.atomic_group_id,
2468 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002469
jadmanski0afbb632008-06-06 21:10:57 +00002470 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002471
showard6ae5ea92009-02-25 00:11:51 +00002472
jadmanski0afbb632008-06-06 21:10:57 +00002473 def requeue(self):
2474 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002475 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002476 # verify/cleanup failure sets the execution subdir, so reset it here
2477 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002478 if self.meta_host:
2479 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002480
2481
jadmanski0afbb632008-06-06 21:10:57 +00002482 def handle_host_failure(self):
2483 """\
2484 Called when this queue entry's host has failed verification and
2485 repair.
2486 """
2487 assert not self.meta_host
2488 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002489 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002490
2491
jadmanskif7fa2cc2008-10-01 14:13:23 +00002492 @property
2493 def aborted_by(self):
2494 self._load_abort_info()
2495 return self._aborted_by
2496
2497
2498 @property
2499 def aborted_on(self):
2500 self._load_abort_info()
2501 return self._aborted_on
2502
2503
2504 def _load_abort_info(self):
2505 """ Fetch info about who aborted the job. """
2506 if hasattr(self, "_aborted_by"):
2507 return
2508 rows = _db.execute("""
2509 SELECT users.login, aborted_host_queue_entries.aborted_on
2510 FROM aborted_host_queue_entries
2511 INNER JOIN users
2512 ON users.id = aborted_host_queue_entries.aborted_by_id
2513 WHERE aborted_host_queue_entries.queue_entry_id = %s
2514 """, (self.id,))
2515 if rows:
2516 self._aborted_by, self._aborted_on = rows[0]
2517 else:
2518 self._aborted_by = self._aborted_on = None
2519
2520
showardb2e2c322008-10-14 17:33:55 +00002521 def on_pending(self):
2522 """
2523 Called when an entry in a synchronous job has passed verify. If the
2524 job is ready to run, returns an agent to run the job. Returns None
2525 otherwise.
2526 """
2527 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002528 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002529 if self.job.is_ready():
2530 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002531 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002532 return None
2533
2534
showardd3dc1992009-04-22 21:01:40 +00002535 def abort(self, dispatcher):
2536 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002537
showardd3dc1992009-04-22 21:01:40 +00002538 Status = models.HostQueueEntry.Status
2539 has_running_job_agent = (
2540 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2541 and dispatcher.get_agents_for_entry(self))
2542 if has_running_job_agent:
2543 # do nothing; post-job tasks will finish and then mark this entry
2544 # with status "Aborted" and take care of the host
2545 return
2546
2547 if self.status in (Status.STARTING, Status.PENDING):
2548 self.host.set_status(models.Host.Status.READY)
2549 elif self.status == Status.VERIFYING:
2550 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2551
2552 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002553
2554 def execution_tag(self):
2555 assert self.execution_subdir
2556 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002557
2558
mbligh36768f02008-02-22 18:28:33 +00002559class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002560 _table_name = 'jobs'
2561 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2562 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002563 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002564 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002565
2566
showarda3c58572009-03-12 20:36:59 +00002567 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002568 assert id or row
showarda3c58572009-03-12 20:36:59 +00002569 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002570
mblighe2586682008-02-29 22:45:46 +00002571
jadmanski0afbb632008-06-06 21:10:57 +00002572 def is_server_job(self):
2573 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002574
2575
showard170873e2009-01-07 00:22:26 +00002576 def tag(self):
2577 return "%s-%s" % (self.id, self.owner)
2578
2579
jadmanski0afbb632008-06-06 21:10:57 +00002580 def get_host_queue_entries(self):
2581 rows = _db.execute("""
2582 SELECT * FROM host_queue_entries
2583 WHERE job_id= %s
2584 """, (self.id,))
2585 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002586
jadmanski0afbb632008-06-06 21:10:57 +00002587 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002588
jadmanski0afbb632008-06-06 21:10:57 +00002589 return entries
mbligh36768f02008-02-22 18:28:33 +00002590
2591
jadmanski0afbb632008-06-06 21:10:57 +00002592 def set_status(self, status, update_queues=False):
2593 self.update_field('status',status)
2594
2595 if update_queues:
2596 for queue_entry in self.get_host_queue_entries():
2597 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002598
2599
jadmanski0afbb632008-06-06 21:10:57 +00002600 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002601 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2602 status='Pending')
2603 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002604
2605
jadmanski0afbb632008-06-06 21:10:57 +00002606 def num_machines(self, clause = None):
2607 sql = "job_id=%s" % self.id
2608 if clause:
2609 sql += " AND (%s)" % clause
2610 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002611
2612
jadmanski0afbb632008-06-06 21:10:57 +00002613 def num_queued(self):
2614 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002615
2616
jadmanski0afbb632008-06-06 21:10:57 +00002617 def num_active(self):
2618 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002619
2620
jadmanski0afbb632008-06-06 21:10:57 +00002621 def num_complete(self):
2622 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002623
2624
jadmanski0afbb632008-06-06 21:10:57 +00002625 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002626 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002627
mbligh36768f02008-02-22 18:28:33 +00002628
showard6bb7c292009-01-30 01:44:51 +00002629 def _not_yet_run_entries(self, include_verifying=True):
2630 statuses = [models.HostQueueEntry.Status.QUEUED,
2631 models.HostQueueEntry.Status.PENDING]
2632 if include_verifying:
2633 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2634 return models.HostQueueEntry.objects.filter(job=self.id,
2635 status__in=statuses)
2636
2637
2638 def _stop_all_entries(self):
2639 entries_to_stop = self._not_yet_run_entries(
2640 include_verifying=False)
2641 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002642 assert not child_entry.complete, (
2643 '%s status=%s, active=%s, complete=%s' %
2644 (child_entry.id, child_entry.status, child_entry.active,
2645 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002646 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2647 child_entry.host.status = models.Host.Status.READY
2648 child_entry.host.save()
2649 child_entry.status = models.HostQueueEntry.Status.STOPPED
2650 child_entry.save()
2651
showard2bab8f42008-11-12 18:15:22 +00002652 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002653 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002654 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002655 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002656
2657
jadmanski0afbb632008-06-06 21:10:57 +00002658 def write_to_machines_file(self, queue_entry):
2659 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002660 file_path = os.path.join(self.tag(), '.machines')
2661 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002662
2663
showardf1ae3542009-05-11 19:26:02 +00002664 def _next_group_name(self, group_name=''):
2665 """@returns a directory name to use for the next host group results."""
2666 if group_name:
2667 # Sanitize for use as a pathname.
2668 group_name = group_name.replace(os.path.sep, '_')
2669 if group_name.startswith('.'):
2670 group_name = '_' + group_name[1:]
2671 # Add a separator between the group name and 'group%d'.
2672 group_name += '.'
2673 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002674 query = models.HostQueueEntry.objects.filter(
2675 job=self.id).values('execution_subdir').distinct()
2676 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002677 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2678 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002679 if ids:
2680 next_id = max(ids) + 1
2681 else:
2682 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002683 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002684
2685
showard170873e2009-01-07 00:22:26 +00002686 def _write_control_file(self, execution_tag):
2687 control_path = _drone_manager.attach_file_to_execution(
2688 execution_tag, self.control_file)
2689 return control_path
mbligh36768f02008-02-22 18:28:33 +00002690
showardb2e2c322008-10-14 17:33:55 +00002691
showard2bab8f42008-11-12 18:15:22 +00002692 def get_group_entries(self, queue_entry_from_group):
2693 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002694 return list(HostQueueEntry.fetch(
2695 where='job_id=%s AND execution_subdir=%s',
2696 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002697
2698
showardb2e2c322008-10-14 17:33:55 +00002699 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002700 assert queue_entries
2701 execution_tag = queue_entries[0].execution_tag()
2702 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002703 hostnames = ','.join([entry.get_host().hostname
2704 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002705
showard87ba02a2009-04-20 19:37:32 +00002706 params = _autoserv_command_line(
2707 hostnames, execution_tag,
2708 ['-P', execution_tag, '-n',
2709 _drone_manager.absolute_path(control_path)],
2710 job=self)
mbligh36768f02008-02-22 18:28:33 +00002711
jadmanski0afbb632008-06-06 21:10:57 +00002712 if not self.is_server_job():
2713 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002714
showardb2e2c322008-10-14 17:33:55 +00002715 return params
mblighe2586682008-02-29 22:45:46 +00002716
mbligh36768f02008-02-22 18:28:33 +00002717
showardc9ae1782009-01-30 01:42:37 +00002718 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002719 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002720 return True
showard0fc38302008-10-23 00:44:07 +00002721 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002722 return queue_entry.get_host().dirty
2723 return False
showard21baa452008-10-21 00:08:39 +00002724
showardc9ae1782009-01-30 01:42:37 +00002725
2726 def _should_run_verify(self, queue_entry):
2727 do_not_verify = (queue_entry.host.protection ==
2728 host_protections.Protection.DO_NOT_VERIFY)
2729 if do_not_verify:
2730 return False
2731 return self.run_verify
2732
2733
2734 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002735 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002736 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002737 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002738 if self._should_run_verify(queue_entry):
2739 tasks.append(VerifyTask(queue_entry=queue_entry))
2740 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002741 return tasks
2742
2743
showardf1ae3542009-05-11 19:26:02 +00002744 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002745 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002746 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002747 else:
showardf1ae3542009-05-11 19:26:02 +00002748 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002749 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002750 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002751 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002752
2753 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002754 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002755
2756
2757 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002758 """
2759 @returns A tuple containing a list of HostQueueEntry instances to be
2760 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002761 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002762 """
2763 if include_queue_entry.atomic_group_id:
2764 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2765 always_query=False)
2766 else:
2767 atomic_group = None
2768
showard2bab8f42008-11-12 18:15:22 +00002769 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002770 if atomic_group:
2771 num_entries_wanted = atomic_group.max_number_of_machines
2772 else:
2773 num_entries_wanted = self.synch_count
2774 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002775
showardf1ae3542009-05-11 19:26:02 +00002776 if num_entries_wanted > 0:
2777 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002778 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002779 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002780 params=(self.id, include_queue_entry.id)))
2781
2782 # Sort the chosen hosts by hostname before slicing.
2783 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2784 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2785 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2786 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002787
showardf1ae3542009-05-11 19:26:02 +00002788 # Sanity check. We'll only ever be called if this can be met.
2789 assert len(chosen_entries) >= self.synch_count
2790
2791 if atomic_group:
2792 # Look at any meta_host and dependency labels and pick the first
2793 # one that also specifies this atomic group. Use that label name
2794 # as the group name if possible (it is more specific).
2795 group_name = atomic_group.name
2796 for label in include_queue_entry.get_labels():
2797 if label.atomic_group_id:
2798 assert label.atomic_group_id == atomic_group.id
2799 group_name = label.name
2800 break
2801 else:
2802 group_name = ''
2803
2804 self._assign_new_group(chosen_entries, group_name=group_name)
2805 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002806
2807
2808 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002809 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002810 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2811 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002812
showardf1ae3542009-05-11 19:26:02 +00002813 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2814 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002815
2816
showardf1ae3542009-05-11 19:26:02 +00002817 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002818 for queue_entry in queue_entries:
2819 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002820 params = self._get_autoserv_params(queue_entries)
2821 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002822 cmd=params, group_name=group_name)
2823 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002824 entry_ids = [entry.id for entry in queue_entries]
2825
showard170873e2009-01-07 00:22:26 +00002826 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002827
2828
mbligh36768f02008-02-22 18:28:33 +00002829if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002830 main()