blob: 6b4ea511faf5701af53e0d31e385c324f4c25ebb [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000227 """
228 @returns The autoserv command line as a list of executable + parameters.
229
230 @param machines - string - A machine or comma separated list of machines
231 for the (-m) flag.
232 @param results_dir - string - Where the results will be written (-r).
233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
240 '-r', _drone_manager.absolute_path(results_dir)]
241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
245 return autoserv_argv + extra_args
246
247
showard89f84db2009-03-12 20:39:13 +0000248class SchedulerError(Exception):
249 """Raised by HostScheduler when an inconsistent state occurs."""
250
251
showard63a34772008-08-18 19:32:50 +0000252class HostScheduler(object):
253 def _get_ready_hosts(self):
254 # avoid any host with a currently active queue entry against it
255 hosts = Host.fetch(
256 joins='LEFT JOIN host_queue_entries AS active_hqe '
257 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000258 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000259 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000260 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000261 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
262 return dict((host.id, host) for host in hosts)
263
264
265 @staticmethod
266 def _get_sql_id_list(id_list):
267 return ','.join(str(item_id) for item_id in id_list)
268
269
270 @classmethod
showard989f25d2008-10-01 11:38:11 +0000271 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000272 if not id_list:
273 return {}
showard63a34772008-08-18 19:32:50 +0000274 query %= cls._get_sql_id_list(id_list)
275 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000276 return cls._process_many2many_dict(rows, flip)
277
278
279 @staticmethod
280 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000281 result = {}
282 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000283 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000284 if flip:
285 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000286 result.setdefault(left_id, set()).add(right_id)
287 return result
288
289
290 @classmethod
291 def _get_job_acl_groups(cls, job_ids):
292 query = """
showardd9ac4452009-02-07 02:04:37 +0000293 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000294 FROM jobs
295 INNER JOIN users ON users.login = jobs.owner
296 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
297 WHERE jobs.id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
303 def _get_job_ineligible_hosts(cls, job_ids):
304 query = """
305 SELECT job_id, host_id
306 FROM ineligible_host_queues
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard989f25d2008-10-01 11:38:11 +0000313 def _get_job_dependencies(cls, job_ids):
314 query = """
315 SELECT job_id, label_id
316 FROM jobs_dependency_labels
317 WHERE job_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, job_ids)
320
321
322 @classmethod
showard63a34772008-08-18 19:32:50 +0000323 def _get_host_acls(cls, host_ids):
324 query = """
showardd9ac4452009-02-07 02:04:37 +0000325 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000326 FROM acl_groups_hosts
327 WHERE host_id IN (%s)
328 """
329 return cls._get_many2many_dict(query, host_ids)
330
331
332 @classmethod
333 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000334 if not host_ids:
335 return {}, {}
showard63a34772008-08-18 19:32:50 +0000336 query = """
337 SELECT label_id, host_id
338 FROM hosts_labels
339 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000340 """ % cls._get_sql_id_list(host_ids)
341 rows = _db.execute(query)
342 labels_to_hosts = cls._process_many2many_dict(rows)
343 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
344 return labels_to_hosts, hosts_to_labels
345
346
347 @classmethod
348 def _get_labels(cls):
349 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000350
351
352 def refresh(self, pending_queue_entries):
353 self._hosts_available = self._get_ready_hosts()
354
355 relevant_jobs = [queue_entry.job_id
356 for queue_entry in pending_queue_entries]
357 self._job_acls = self._get_job_acl_groups(relevant_jobs)
358 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000359 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000360
361 host_ids = self._hosts_available.keys()
362 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000363 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
364
365 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000366
367
368 def _is_acl_accessible(self, host_id, queue_entry):
369 job_acls = self._job_acls.get(queue_entry.job_id, set())
370 host_acls = self._host_acls.get(host_id, set())
371 return len(host_acls.intersection(job_acls)) > 0
372
373
showard989f25d2008-10-01 11:38:11 +0000374 def _check_job_dependencies(self, job_dependencies, host_labels):
375 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000376 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000377
378
379 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
380 queue_entry):
showardade14e22009-01-26 22:38:32 +0000381 if not queue_entry.meta_host:
382 # bypass only_if_needed labels when a specific host is selected
383 return True
384
showard989f25d2008-10-01 11:38:11 +0000385 for label_id in host_labels:
386 label = self._labels[label_id]
387 if not label.only_if_needed:
388 # we don't care about non-only_if_needed labels
389 continue
390 if queue_entry.meta_host == label_id:
391 # if the label was requested in a metahost it's OK
392 continue
393 if label_id not in job_dependencies:
394 return False
395 return True
396
397
showard89f84db2009-03-12 20:39:13 +0000398 def _check_atomic_group_labels(self, host_labels, queue_entry):
399 """
400 Determine if the given HostQueueEntry's atomic group settings are okay
401 to schedule on a host with the given labels.
402
403 @param host_labels - A list of label ids that the host has.
404 @param queue_entry - The HostQueueEntry being considered for the host.
405
406 @returns True if atomic group settings are okay, False otherwise.
407 """
408 return (self._get_host_atomic_group_id(host_labels) ==
409 queue_entry.atomic_group_id)
410
411
412 def _get_host_atomic_group_id(self, host_labels):
413 """
414 Return the atomic group label id for a host with the given set of
415 labels if any, or None otherwise. Raises an exception if more than
416 one atomic group are found in the set of labels.
417
418 @param host_labels - A list of label ids that the host has.
419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
422 @raises SchedulerError - If more than one atomic group label is found.
423 """
424 atomic_ids = [self._labels[label_id].atomic_group_id
425 for label_id in host_labels
426 if self._labels[label_id].atomic_group_id is not None]
427 if not atomic_ids:
428 return None
429 if len(atomic_ids) > 1:
430 raise SchedulerError('More than one atomic label on host.')
431 return atomic_ids[0]
432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
447 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
463 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
464 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000465
showard89f84db2009-03-12 20:39:13 +0000466 return (self._is_acl_accessible(host_id, queue_entry) and
467 self._check_job_dependencies(job_dependencies, host_labels) and
468 self._check_only_if_needed_labels(
469 job_dependencies, host_labels, queue_entry) and
470 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000471
472
showard63a34772008-08-18 19:32:50 +0000473 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000474 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000475 return None
476 return self._hosts_available.pop(queue_entry.host_id, None)
477
478
479 def _is_host_usable(self, host_id):
480 if host_id not in self._hosts_available:
481 # host was already used during this scheduling cycle
482 return False
483 if self._hosts_available[host_id].invalid:
484 # Invalid hosts cannot be used for metahosts. They're included in
485 # the original query because they can be used by non-metahosts.
486 return False
487 return True
488
489
490 def _schedule_metahost(self, queue_entry):
491 label_id = queue_entry.meta_host
492 hosts_in_label = self._label_hosts.get(label_id, set())
493 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
494 set())
495
496 # must iterate over a copy so we can mutate the original while iterating
497 for host_id in list(hosts_in_label):
498 if not self._is_host_usable(host_id):
499 hosts_in_label.remove(host_id)
500 continue
501 if host_id in ineligible_host_ids:
502 continue
showard989f25d2008-10-01 11:38:11 +0000503 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000504 continue
505
showard89f84db2009-03-12 20:39:13 +0000506 # Remove the host from our cached internal state before returning
507 # the host object.
showard63a34772008-08-18 19:32:50 +0000508 hosts_in_label.remove(host_id)
509 return self._hosts_available.pop(host_id)
510 return None
511
512
513 def find_eligible_host(self, queue_entry):
514 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000515 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000516 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000517 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000518 return self._schedule_metahost(queue_entry)
519
520
showard89f84db2009-03-12 20:39:13 +0000521 def find_eligible_atomic_group(self, queue_entry):
522 """
523 Given an atomic group host queue entry, locate an appropriate group
524 of hosts for the associated job to run on.
525
526 The caller is responsible for creating new HQEs for the additional
527 hosts returned in order to run the actual job on them.
528
529 @returns A list of Host instances in a ready state to satisfy this
530 atomic group scheduling. Hosts will all belong to the same
531 atomic group label as specified by the queue_entry.
532 An empty list will be returned if no suitable atomic
533 group could be found.
534
535 TODO(gps): what is responsible for kicking off any attempted repairs on
536 a group of hosts? not this function, but something needs to. We do
537 not communicate that reason for returning [] outside of here...
538 For now, we'll just be unschedulable if enough hosts within one group
539 enter Repair Failed state.
540 """
541 assert queue_entry.atomic_group_id is not None
542 job = queue_entry.job
543 assert job.synch_count and job.synch_count > 0
544 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
545 if job.synch_count > atomic_group.max_number_of_machines:
546 # Such a Job and HostQueueEntry should never be possible to
547 # create using the frontend. Regardless, we can't process it.
548 # Abort it immediately and log an error on the scheduler.
549 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000550 logging.error(
551 'Error: job %d synch_count=%d > requested atomic_group %d '
552 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
553 job.id, job.synch_count, atomic_group.id,
554 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000555 return []
556 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
557 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
558 set())
559
560 # Look in each label associated with atomic_group until we find one with
561 # enough hosts to satisfy the job.
562 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
563 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
564 if queue_entry.meta_host is not None:
565 # If we have a metahost label, only allow its hosts.
566 group_hosts.intersection_update(hosts_in_label)
567 group_hosts -= ineligible_host_ids
568 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
569 group_hosts, queue_entry)
570
571 # Job.synch_count is treated as "minimum synch count" when
572 # scheduling for an atomic group of hosts. The atomic group
573 # number of machines is the maximum to pick out of a single
574 # atomic group label for scheduling at one time.
575 min_hosts = job.synch_count
576 max_hosts = atomic_group.max_number_of_machines
577
578 if len(eligible_hosts_in_group) < min_hosts:
579 # Not enough eligible hosts in this atomic group label.
580 continue
581
showardef519212009-05-08 02:29:53 +0000582 # So that they show up in a sane order when viewing the job.
583 eligible_hosts_in_group = sorted(eligible_hosts_in_group)
584
showard89f84db2009-03-12 20:39:13 +0000585 # Limit ourselves to scheduling the atomic group size.
586 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000587 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000588
589 # Remove the selected hosts from our cached internal state
590 # of available hosts in order to return the Host objects.
591 host_list = []
592 for host_id in eligible_hosts_in_group:
593 hosts_in_label.discard(host_id)
594 host_list.append(self._hosts_available.pop(host_id))
595 return host_list
596
597 return []
598
599
showard170873e2009-01-07 00:22:26 +0000600class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000601 def __init__(self):
602 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000603 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000604 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000605 user_cleanup_time = scheduler_config.config.clean_interval
606 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
607 _db, user_cleanup_time)
608 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000609 self._host_agents = {}
610 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000611
mbligh36768f02008-02-22 18:28:33 +0000612
showard915958d2009-04-22 21:00:58 +0000613 def initialize(self, recover_hosts=True):
614 self._periodic_cleanup.initialize()
615 self._24hr_upkeep.initialize()
616
jadmanski0afbb632008-06-06 21:10:57 +0000617 # always recover processes
618 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000619
jadmanski0afbb632008-06-06 21:10:57 +0000620 if recover_hosts:
621 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000622
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 def tick(self):
showard170873e2009-01-07 00:22:26 +0000625 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000626 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000627 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000628 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000629 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000630 self._schedule_new_jobs()
631 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.execute_actions()
633 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000634
showard97aed502008-11-04 02:01:24 +0000635
mblighf3294cc2009-04-08 21:17:38 +0000636 def _run_cleanup(self):
637 self._periodic_cleanup.run_cleanup_maybe()
638 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000639
mbligh36768f02008-02-22 18:28:33 +0000640
showard170873e2009-01-07 00:22:26 +0000641 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
642 for object_id in object_ids:
643 agent_dict.setdefault(object_id, set()).add(agent)
644
645
646 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
647 for object_id in object_ids:
648 assert object_id in agent_dict
649 agent_dict[object_id].remove(agent)
650
651
jadmanski0afbb632008-06-06 21:10:57 +0000652 def add_agent(self, agent):
653 self._agents.append(agent)
654 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000655 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
656 self._register_agent_for_ids(self._queue_entry_agents,
657 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000658
showard170873e2009-01-07 00:22:26 +0000659
660 def get_agents_for_entry(self, queue_entry):
661 """
662 Find agents corresponding to the specified queue_entry.
663 """
showardd3dc1992009-04-22 21:01:40 +0000664 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000665
666
667 def host_has_agent(self, host):
668 """
669 Determine if there is currently an Agent present using this host.
670 """
671 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000672
673
jadmanski0afbb632008-06-06 21:10:57 +0000674 def remove_agent(self, agent):
675 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000676 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
677 agent)
678 self._unregister_agent_for_ids(self._queue_entry_agents,
679 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000680
681
jadmanski0afbb632008-06-06 21:10:57 +0000682 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000683 self._register_pidfiles()
684 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000685 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000686 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000687 self._reverify_remaining_hosts()
688 # reinitialize drones after killing orphaned processes, since they can
689 # leave around files when they die
690 _drone_manager.execute_actions()
691 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000692
showard170873e2009-01-07 00:22:26 +0000693
694 def _register_pidfiles(self):
695 # during recovery we may need to read pidfiles for both running and
696 # parsing entries
697 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000698 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000699 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000700 for pidfile_name in _ALL_PIDFILE_NAMES:
701 pidfile_id = _drone_manager.get_pidfile_id_from(
702 queue_entry.execution_tag(), pidfile_name=pidfile_name)
703 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000704
705
showardd3dc1992009-04-22 21:01:40 +0000706 def _recover_entries_with_status(self, status, orphans, pidfile_name,
707 recover_entries_fn):
708 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000709 for queue_entry in queue_entries:
710 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000711 # synchronous job we've already recovered
712 continue
showardd3dc1992009-04-22 21:01:40 +0000713 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000714 execution_tag = queue_entry.execution_tag()
715 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000716 run_monitor.attach_to_existing_process(execution_tag,
717 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000718
719 log_message = ('Recovering %s entry %s ' %
720 (status.lower(),
721 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000722 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000723 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000724 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000725 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000726 continue
mbligh90a549d2008-03-25 23:52:34 +0000727
showard597bfd32009-05-08 18:22:50 +0000728 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000729 run_monitor.get_process())
730 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
731 orphans.discard(run_monitor.get_process())
732
733
734 def _kill_remaining_orphan_processes(self, orphans):
735 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000736 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000737 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000738
showard170873e2009-01-07 00:22:26 +0000739
showardd3dc1992009-04-22 21:01:40 +0000740 def _recover_running_entries(self, orphans):
741 def recover_entries(job, queue_entries, run_monitor):
742 if run_monitor is not None:
743 queue_task = RecoveryQueueTask(job=job,
744 queue_entries=queue_entries,
745 run_monitor=run_monitor)
746 self.add_agent(Agent(tasks=[queue_task],
747 num_processes=len(queue_entries)))
748 # else, _requeue_other_active_entries will cover this
749
750 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
751 orphans, '.autoserv_execute',
752 recover_entries)
753
754
755 def _recover_gathering_entries(self, orphans):
756 def recover_entries(job, queue_entries, run_monitor):
757 gather_task = GatherLogsTask(job, queue_entries,
758 run_monitor=run_monitor)
759 self.add_agent(Agent([gather_task]))
760
761 self._recover_entries_with_status(
762 models.HostQueueEntry.Status.GATHERING,
763 orphans, _CRASHINFO_PID_FILE, recover_entries)
764
765
766 def _recover_parsing_entries(self, orphans):
767 def recover_entries(job, queue_entries, run_monitor):
768 reparse_task = FinalReparseTask(queue_entries,
769 run_monitor=run_monitor)
770 self.add_agent(Agent([reparse_task], num_processes=0))
771
772 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
773 orphans, _PARSER_PID_FILE,
774 recover_entries)
775
776
777 def _recover_all_recoverable_entries(self):
778 orphans = _drone_manager.get_orphaned_autoserv_processes()
779 self._recover_running_entries(orphans)
780 self._recover_gathering_entries(orphans)
781 self._recover_parsing_entries(orphans)
782 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000783
showard97aed502008-11-04 02:01:24 +0000784
showard170873e2009-01-07 00:22:26 +0000785 def _requeue_other_active_entries(self):
786 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000787 where='active AND NOT complete AND '
788 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000789 for queue_entry in queue_entries:
790 if self.get_agents_for_entry(queue_entry):
791 # entry has already been recovered
792 continue
showardd3dc1992009-04-22 21:01:40 +0000793 if queue_entry.aborted:
794 queue_entry.abort(self)
795 continue
796
797 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000798 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000799 if queue_entry.host:
800 tasks = queue_entry.host.reverify_tasks()
801 self.add_agent(Agent(tasks))
802 agent = queue_entry.requeue()
803
804
showard1ff7b2e2009-05-15 23:17:18 +0000805 def _find_reverify(self):
806 self._reverify_hosts_where("status = 'Reverify'")
807
808
showard170873e2009-01-07 00:22:26 +0000809 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000810 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000811 self._reverify_hosts_where("""(status = 'Repairing' OR
812 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000813 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000814
showard170873e2009-01-07 00:22:26 +0000815 # recover "Running" hosts with no active queue entries, although this
816 # should never happen
817 message = ('Recovering running host %s - this probably indicates a '
818 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000819 self._reverify_hosts_where("""status = 'Running' AND
820 id NOT IN (SELECT host_id
821 FROM host_queue_entries
822 WHERE active)""",
823 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000824
825
jadmanski0afbb632008-06-06 21:10:57 +0000826 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000827 print_message='Reverifying host %s'):
828 full_where='locked = 0 AND invalid = 0 AND ' + where
829 for host in Host.fetch(where=full_where):
830 if self.host_has_agent(host):
831 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000832 continue
showard170873e2009-01-07 00:22:26 +0000833 if print_message:
showardb18134f2009-03-20 20:52:18 +0000834 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000835 tasks = host.reverify_tasks()
836 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000837
838
jadmanski0afbb632008-06-06 21:10:57 +0000839 def _recover_hosts(self):
840 # recover "Repair Failed" hosts
841 message = 'Reverifying dead host %s'
842 self._reverify_hosts_where("status = 'Repair Failed'",
843 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000844
845
showard04c82c52008-05-29 19:38:12 +0000846
showardb95b1bd2008-08-15 18:11:04 +0000847 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000848 # prioritize by job priority, then non-metahost over metahost, then FIFO
849 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000850 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000851 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000852 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000853
854
showard89f84db2009-03-12 20:39:13 +0000855 def _refresh_pending_queue_entries(self):
856 """
857 Lookup the pending HostQueueEntries and call our HostScheduler
858 refresh() method given that list. Return the list.
859
860 @returns A list of pending HostQueueEntries sorted in priority order.
861 """
showard63a34772008-08-18 19:32:50 +0000862 queue_entries = self._get_pending_queue_entries()
863 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000864 return []
showardb95b1bd2008-08-15 18:11:04 +0000865
showard63a34772008-08-18 19:32:50 +0000866 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000867
showard89f84db2009-03-12 20:39:13 +0000868 return queue_entries
869
870
871 def _schedule_atomic_group(self, queue_entry):
872 """
873 Schedule the given queue_entry on an atomic group of hosts.
874
875 Returns immediately if there are insufficient available hosts.
876
877 Creates new HostQueueEntries based off of queue_entry for the
878 scheduled hosts and starts them all running.
879 """
880 # This is a virtual host queue entry representing an entire
881 # atomic group, find a group and schedule their hosts.
882 group_hosts = self._host_scheduler.find_eligible_atomic_group(
883 queue_entry)
884 if not group_hosts:
885 return
886 # The first assigned host uses the original HostQueueEntry
887 group_queue_entries = [queue_entry]
888 for assigned_host in group_hosts[1:]:
889 # Create a new HQE for every additional assigned_host.
890 new_hqe = HostQueueEntry.clone(queue_entry)
891 new_hqe.save()
892 group_queue_entries.append(new_hqe)
893 assert len(group_queue_entries) == len(group_hosts)
894 for queue_entry, host in itertools.izip(group_queue_entries,
895 group_hosts):
896 self._run_queue_entry(queue_entry, host)
897
898
899 def _schedule_new_jobs(self):
900 queue_entries = self._refresh_pending_queue_entries()
901 if not queue_entries:
902 return
903
showard63a34772008-08-18 19:32:50 +0000904 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000905 if (queue_entry.atomic_group_id is None or
906 queue_entry.host_id is not None):
907 assigned_host = self._host_scheduler.find_eligible_host(
908 queue_entry)
909 if assigned_host:
910 self._run_queue_entry(queue_entry, assigned_host)
911 else:
912 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000913
914
915 def _run_queue_entry(self, queue_entry, host):
916 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000917 # in some cases (synchronous jobs with run_verify=False), agent may be
918 # None
showard9976ce92008-10-15 20:28:13 +0000919 if agent:
920 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000921
922
jadmanski0afbb632008-06-06 21:10:57 +0000923 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000924 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
925 for agent in self.get_agents_for_entry(entry):
926 agent.abort()
927 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000928
929
showard324bf812009-01-20 23:23:38 +0000930 def _can_start_agent(self, agent, num_started_this_cycle,
931 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000932 # always allow zero-process agents to run
933 if agent.num_processes == 0:
934 return True
935 # don't allow any nonzero-process agents to run after we've reached a
936 # limit (this avoids starvation of many-process agents)
937 if have_reached_limit:
938 return False
939 # total process throttling
showard324bf812009-01-20 23:23:38 +0000940 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000941 return False
942 # if a single agent exceeds the per-cycle throttling, still allow it to
943 # run when it's the first agent in the cycle
944 if num_started_this_cycle == 0:
945 return True
946 # per-cycle throttling
947 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000948 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000949 return False
950 return True
951
952
jadmanski0afbb632008-06-06 21:10:57 +0000953 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000954 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000955 have_reached_limit = False
956 # iterate over copy, so we can remove agents during iteration
957 for agent in list(self._agents):
958 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000959 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000960 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000961 continue
962 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000963 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000964 have_reached_limit):
965 have_reached_limit = True
966 continue
showard4c5374f2008-09-04 17:02:56 +0000967 num_started_this_cycle += agent.num_processes
968 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000969 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000970 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000971
972
showard29f7cd22009-04-29 21:16:24 +0000973 def _process_recurring_runs(self):
974 recurring_runs = models.RecurringRun.objects.filter(
975 start_date__lte=datetime.datetime.now())
976 for rrun in recurring_runs:
977 # Create job from template
978 job = rrun.job
979 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000980 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000981
982 host_objects = info['hosts']
983 one_time_hosts = info['one_time_hosts']
984 metahost_objects = info['meta_hosts']
985 dependencies = info['dependencies']
986 atomic_group = info['atomic_group']
987
988 for host in one_time_hosts or []:
989 this_host = models.Host.create_one_time_host(host.hostname)
990 host_objects.append(this_host)
991
992 try:
993 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000994 options=options,
showard29f7cd22009-04-29 21:16:24 +0000995 host_objects=host_objects,
996 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000997 atomic_group=atomic_group)
998
999 except Exception, ex:
1000 logging.exception(ex)
1001 #TODO send email
1002
1003 if rrun.loop_count == 1:
1004 rrun.delete()
1005 else:
1006 if rrun.loop_count != 0: # if not infinite loop
1007 # calculate new start_date
1008 difference = datetime.timedelta(seconds=rrun.loop_period)
1009 rrun.start_date = rrun.start_date + difference
1010 rrun.loop_count -= 1
1011 rrun.save()
1012
1013
showard170873e2009-01-07 00:22:26 +00001014class PidfileRunMonitor(object):
1015 """
1016 Client must call either run() to start a new process or
1017 attach_to_existing_process().
1018 """
mbligh36768f02008-02-22 18:28:33 +00001019
showard170873e2009-01-07 00:22:26 +00001020 class _PidfileException(Exception):
1021 """
1022 Raised when there's some unexpected behavior with the pid file, but only
1023 used internally (never allowed to escape this class).
1024 """
mbligh36768f02008-02-22 18:28:33 +00001025
1026
showard170873e2009-01-07 00:22:26 +00001027 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001028 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001029 self._start_time = None
1030 self.pidfile_id = None
1031 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001032
1033
showard170873e2009-01-07 00:22:26 +00001034 def _add_nice_command(self, command, nice_level):
1035 if not nice_level:
1036 return command
1037 return ['nice', '-n', str(nice_level)] + command
1038
1039
1040 def _set_start_time(self):
1041 self._start_time = time.time()
1042
1043
1044 def run(self, command, working_directory, nice_level=None, log_file=None,
1045 pidfile_name=None, paired_with_pidfile=None):
1046 assert command is not None
1047 if nice_level is not None:
1048 command = ['nice', '-n', str(nice_level)] + command
1049 self._set_start_time()
1050 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001051 command, working_directory, pidfile_name=pidfile_name,
1052 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001053
1054
showardd3dc1992009-04-22 21:01:40 +00001055 def attach_to_existing_process(self, execution_tag,
1056 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001057 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001058 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1059 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001060 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001061
1062
jadmanski0afbb632008-06-06 21:10:57 +00001063 def kill(self):
showard170873e2009-01-07 00:22:26 +00001064 if self.has_process():
1065 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001066
mbligh36768f02008-02-22 18:28:33 +00001067
showard170873e2009-01-07 00:22:26 +00001068 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001069 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001070 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001071
1072
showard170873e2009-01-07 00:22:26 +00001073 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001074 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001075 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001076 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001077
1078
showard170873e2009-01-07 00:22:26 +00001079 def _read_pidfile(self, use_second_read=False):
1080 assert self.pidfile_id is not None, (
1081 'You must call run() or attach_to_existing_process()')
1082 contents = _drone_manager.get_pidfile_contents(
1083 self.pidfile_id, use_second_read=use_second_read)
1084 if contents.is_invalid():
1085 self._state = drone_manager.PidfileContents()
1086 raise self._PidfileException(contents)
1087 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001088
1089
showard21baa452008-10-21 00:08:39 +00001090 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001091 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1092 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001093 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001094 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001095 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001096
1097
1098 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001099 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001100 return
mblighbb421852008-03-11 22:36:16 +00001101
showard21baa452008-10-21 00:08:39 +00001102 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001103
showard170873e2009-01-07 00:22:26 +00001104 if self._state.process is None:
1105 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001106 return
mbligh90a549d2008-03-25 23:52:34 +00001107
showard21baa452008-10-21 00:08:39 +00001108 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001109 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001110 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001111 return
mbligh90a549d2008-03-25 23:52:34 +00001112
showard170873e2009-01-07 00:22:26 +00001113 # pid but no running process - maybe process *just* exited
1114 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001115 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001116 # autoserv exited without writing an exit code
1117 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001118 self._handle_pidfile_error(
1119 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001120
showard21baa452008-10-21 00:08:39 +00001121
1122 def _get_pidfile_info(self):
1123 """\
1124 After completion, self._state will contain:
1125 pid=None, exit_status=None if autoserv has not yet run
1126 pid!=None, exit_status=None if autoserv is running
1127 pid!=None, exit_status!=None if autoserv has completed
1128 """
1129 try:
1130 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001131 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001132 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001133
1134
showard170873e2009-01-07 00:22:26 +00001135 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001136 """\
1137 Called when no pidfile is found or no pid is in the pidfile.
1138 """
showard170873e2009-01-07 00:22:26 +00001139 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001140 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001141 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1142 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001143 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001144 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001145
1146
showard35162b02009-03-03 02:17:30 +00001147 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001148 """\
1149 Called when autoserv has exited without writing an exit status,
1150 or we've timed out waiting for autoserv to write a pid to the
1151 pidfile. In either case, we just return failure and the caller
1152 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001153
showard170873e2009-01-07 00:22:26 +00001154 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001155 """
1156 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001157 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001158 self._state.exit_status = 1
1159 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001160
1161
jadmanski0afbb632008-06-06 21:10:57 +00001162 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001163 self._get_pidfile_info()
1164 return self._state.exit_status
1165
1166
1167 def num_tests_failed(self):
1168 self._get_pidfile_info()
1169 assert self._state.num_tests_failed is not None
1170 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001171
1172
mbligh36768f02008-02-22 18:28:33 +00001173class Agent(object):
showard170873e2009-01-07 00:22:26 +00001174 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001175 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001176 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001177 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001178 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001179
showard170873e2009-01-07 00:22:26 +00001180 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1181 for task in tasks)
1182 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1183
showardd3dc1992009-04-22 21:01:40 +00001184 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001185 for task in tasks:
1186 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001187
1188
showardd3dc1992009-04-22 21:01:40 +00001189 def _clear_queue(self):
1190 self.queue = Queue.Queue(0)
1191
1192
showard170873e2009-01-07 00:22:26 +00001193 def _union_ids(self, id_lists):
1194 return set(itertools.chain(*id_lists))
1195
1196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def add_task(self, task):
1198 self.queue.put_nowait(task)
1199 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def tick(self):
showard21baa452008-10-21 00:08:39 +00001203 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001204 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001205 self.active_task.poll()
1206 if not self.active_task.is_done():
1207 return
1208 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001209
1210
jadmanski0afbb632008-06-06 21:10:57 +00001211 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001212 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001213 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001214 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001215 if not self.active_task.success:
1216 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001217 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001218
jadmanski0afbb632008-06-06 21:10:57 +00001219 if not self.is_done():
1220 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001221
1222
jadmanski0afbb632008-06-06 21:10:57 +00001223 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001224 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001225 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1226 # get reset.
1227 new_agent = Agent(self.active_task.failure_tasks)
1228 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001229
mblighe2586682008-02-29 22:45:46 +00001230
showard4c5374f2008-09-04 17:02:56 +00001231 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001232 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001233
1234
jadmanski0afbb632008-06-06 21:10:57 +00001235 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001236 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001237
1238
showardd3dc1992009-04-22 21:01:40 +00001239 def abort(self):
showard08a36412009-05-05 01:01:13 +00001240 # abort tasks until the queue is empty or a task ignores the abort
1241 while not self.is_done():
1242 if not self.active_task:
1243 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001244 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001245 if not self.active_task.aborted:
1246 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001247 return
1248 self.active_task = None
1249
showardd3dc1992009-04-22 21:01:40 +00001250
mbligh36768f02008-02-22 18:28:33 +00001251class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001252 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1253 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001254 self.done = False
1255 self.failure_tasks = failure_tasks
1256 self.started = False
1257 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001258 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001259 self.task = None
1260 self.agent = None
1261 self.monitor = None
1262 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001263 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001264 self.queue_entry_ids = []
1265 self.host_ids = []
1266 self.log_file = None
1267
1268
1269 def _set_ids(self, host=None, queue_entries=None):
1270 if queue_entries and queue_entries != [None]:
1271 self.host_ids = [entry.host.id for entry in queue_entries]
1272 self.queue_entry_ids = [entry.id for entry in queue_entries]
1273 else:
1274 assert host
1275 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001276
1277
jadmanski0afbb632008-06-06 21:10:57 +00001278 def poll(self):
showard08a36412009-05-05 01:01:13 +00001279 if not self.started:
1280 self.start()
1281 self.tick()
1282
1283
1284 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001285 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001286 exit_code = self.monitor.exit_code()
1287 if exit_code is None:
1288 return
1289 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001290 else:
1291 success = False
mbligh36768f02008-02-22 18:28:33 +00001292
jadmanski0afbb632008-06-06 21:10:57 +00001293 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001294
1295
jadmanski0afbb632008-06-06 21:10:57 +00001296 def is_done(self):
1297 return self.done
mbligh36768f02008-02-22 18:28:33 +00001298
1299
jadmanski0afbb632008-06-06 21:10:57 +00001300 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001301 if self.done:
1302 return
jadmanski0afbb632008-06-06 21:10:57 +00001303 self.done = True
1304 self.success = success
1305 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001306
1307
jadmanski0afbb632008-06-06 21:10:57 +00001308 def prolog(self):
1309 pass
mblighd64e5702008-04-04 21:39:28 +00001310
1311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001313 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001314
mbligh36768f02008-02-22 18:28:33 +00001315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001317 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001318 _drone_manager.copy_to_results_repository(
1319 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001320
1321
jadmanski0afbb632008-06-06 21:10:57 +00001322 def epilog(self):
1323 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 def start(self):
1327 assert self.agent
1328
1329 if not self.started:
1330 self.prolog()
1331 self.run()
1332
1333 self.started = True
1334
1335
1336 def abort(self):
1337 if self.monitor:
1338 self.monitor.kill()
1339 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001340 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001341 self.cleanup()
1342
1343
showard170873e2009-01-07 00:22:26 +00001344 def set_host_log_file(self, base_name, host):
1345 filename = '%s.%s' % (time.time(), base_name)
1346 self.log_file = os.path.join('hosts', host.hostname, filename)
1347
1348
showardde634ee2009-01-30 01:44:24 +00001349 def _get_consistent_execution_tag(self, queue_entries):
1350 first_execution_tag = queue_entries[0].execution_tag()
1351 for queue_entry in queue_entries[1:]:
1352 assert queue_entry.execution_tag() == first_execution_tag, (
1353 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1354 queue_entry,
1355 first_execution_tag,
1356 queue_entries[0]))
1357 return first_execution_tag
1358
1359
showarda1e74b32009-05-12 17:32:04 +00001360 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001361 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001362 if use_monitor is None:
1363 assert self.monitor
1364 use_monitor = self.monitor
1365 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001366 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001367 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001368 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001369 results_path)
showardde634ee2009-01-30 01:44:24 +00001370
showarda1e74b32009-05-12 17:32:04 +00001371
1372 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001373 reparse_task = FinalReparseTask(queue_entries)
1374 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1375
1376
showarda1e74b32009-05-12 17:32:04 +00001377 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1378 self._copy_results(queue_entries, use_monitor)
1379 self._parse_results(queue_entries)
1380
1381
showardd3dc1992009-04-22 21:01:40 +00001382 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001383 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001384 self.monitor = PidfileRunMonitor()
1385 self.monitor.run(self.cmd, self._working_directory,
1386 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001387 log_file=self.log_file,
1388 pidfile_name=pidfile_name,
1389 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001390
1391
showardd9205182009-04-27 20:09:55 +00001392class TaskWithJobKeyvals(object):
1393 """AgentTask mixin providing functionality to help with job keyval files."""
1394 _KEYVAL_FILE = 'keyval'
1395 def _format_keyval(self, key, value):
1396 return '%s=%s' % (key, value)
1397
1398
1399 def _keyval_path(self):
1400 """Subclasses must override this"""
1401 raise NotImplemented
1402
1403
1404 def _write_keyval_after_job(self, field, value):
1405 assert self.monitor
1406 if not self.monitor.has_process():
1407 return
1408 _drone_manager.write_lines_to_file(
1409 self._keyval_path(), [self._format_keyval(field, value)],
1410 paired_with_process=self.monitor.get_process())
1411
1412
1413 def _job_queued_keyval(self, job):
1414 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1415
1416
1417 def _write_job_finished(self):
1418 self._write_keyval_after_job("job_finished", int(time.time()))
1419
1420
1421class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001422 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001423 """\
showard170873e2009-01-07 00:22:26 +00001424 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001425 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001426 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001427 # normalize the protection name
1428 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001429
jadmanski0afbb632008-06-06 21:10:57 +00001430 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001431 self.queue_entry_to_fail = queue_entry
1432 # *don't* include the queue entry in IDs -- if the queue entry is
1433 # aborted, we want to leave the repair task running
1434 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001435
1436 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001437 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1438 ['-R', '--host-protection', protection],
1439 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001440 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1441
showard170873e2009-01-07 00:22:26 +00001442 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001443
mbligh36768f02008-02-22 18:28:33 +00001444
jadmanski0afbb632008-06-06 21:10:57 +00001445 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001446 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001447 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001448 if self.queue_entry_to_fail:
1449 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001450
1451
showardd9205182009-04-27 20:09:55 +00001452 def _keyval_path(self):
1453 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1454
1455
showardde634ee2009-01-30 01:44:24 +00001456 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001457 assert self.queue_entry_to_fail
1458
1459 if self.queue_entry_to_fail.meta_host:
1460 return # don't fail metahost entries, they'll be reassigned
1461
1462 self.queue_entry_to_fail.update_from_database()
1463 if self.queue_entry_to_fail.status != 'Queued':
1464 return # entry has been aborted
1465
1466 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001467 queued_key, queued_time = self._job_queued_keyval(
1468 self.queue_entry_to_fail.job)
1469 self._write_keyval_after_job(queued_key, queued_time)
1470 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001471 # copy results logs into the normal place for job results
1472 _drone_manager.copy_results_on_drone(
1473 self.monitor.get_process(),
1474 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001475 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001476
showarda1e74b32009-05-12 17:32:04 +00001477 self._copy_results([self.queue_entry_to_fail])
1478 if self.queue_entry_to_fail.job.parse_failed_repair:
1479 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001480 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001481
1482
jadmanski0afbb632008-06-06 21:10:57 +00001483 def epilog(self):
1484 super(RepairTask, self).epilog()
1485 if self.success:
1486 self.host.set_status('Ready')
1487 else:
1488 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001489 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001490 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001491
1492
showard8fe93b52008-11-18 17:53:22 +00001493class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001494 def epilog(self):
1495 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001496 should_copy_results = (self.queue_entry and not self.success
1497 and not self.queue_entry.meta_host)
1498 if should_copy_results:
1499 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001500 destination = os.path.join(self.queue_entry.execution_tag(),
1501 os.path.basename(self.log_file))
1502 _drone_manager.copy_to_results_repository(
1503 self.monitor.get_process(), self.log_file,
1504 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001505
1506
1507class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001508 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001509 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001510 self.host = host or queue_entry.host
1511 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001512
jadmanski0afbb632008-06-06 21:10:57 +00001513 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001514 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1515 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001516 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001517 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1518 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001519
showard170873e2009-01-07 00:22:26 +00001520 self.set_host_log_file('verify', self.host)
1521 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001522
1523
jadmanski0afbb632008-06-06 21:10:57 +00001524 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001525 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001526 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001527 if self.queue_entry:
1528 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001529 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def epilog(self):
1533 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001534
jadmanski0afbb632008-06-06 21:10:57 +00001535 if self.success:
1536 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001537
1538
showardd9205182009-04-27 20:09:55 +00001539class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001540 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001541 self.job = job
1542 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001543 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001544 super(QueueTask, self).__init__(cmd, self._execution_tag())
1545 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001546
1547
showard73ec0442009-02-07 02:05:20 +00001548 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001549 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001550
1551
1552 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1553 keyval_contents = '\n'.join(self._format_keyval(key, value)
1554 for key, value in keyval_dict.iteritems())
1555 # always end with a newline to allow additional keyvals to be written
1556 keyval_contents += '\n'
1557 _drone_manager.attach_file_to_execution(self._execution_tag(),
1558 keyval_contents,
1559 file_path=keyval_path)
1560
1561
1562 def _write_keyvals_before_job(self, keyval_dict):
1563 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1564
1565
showard170873e2009-01-07 00:22:26 +00001566 def _write_host_keyvals(self, host):
1567 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1568 host.hostname)
1569 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001570 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1571 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001572
1573
showard170873e2009-01-07 00:22:26 +00001574 def _execution_tag(self):
1575 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001576
1577
jadmanski0afbb632008-06-06 21:10:57 +00001578 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001579 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001580 keyval_dict = {queued_key: queued_time}
1581 if self.group_name:
1582 keyval_dict['host_group_name'] = self.group_name
1583 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001584 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001585 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001586 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001587 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001588 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001589 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001590 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001591 assert len(self.queue_entries) == 1
1592 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001593
1594
showard35162b02009-03-03 02:17:30 +00001595 def _write_lost_process_error_file(self):
1596 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1597 _drone_manager.write_lines_to_file(error_file_path,
1598 [_LOST_PROCESS_ERROR])
1599
1600
showardd3dc1992009-04-22 21:01:40 +00001601 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001602 if not self.monitor:
1603 return
1604
showardd9205182009-04-27 20:09:55 +00001605 self._write_job_finished()
1606
showardd3dc1992009-04-22 21:01:40 +00001607 # both of these conditionals can be true, iff the process ran, wrote a
1608 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001609 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001610 gather_task = GatherLogsTask(self.job, self.queue_entries)
1611 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001612
1613 if self.monitor.lost_process:
1614 self._write_lost_process_error_file()
1615 for queue_entry in self.queue_entries:
1616 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001617
1618
showardcbd74612008-11-19 21:42:02 +00001619 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001620 _drone_manager.write_lines_to_file(
1621 os.path.join(self._execution_tag(), 'status.log'),
1622 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001623 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001624
1625
jadmanskif7fa2cc2008-10-01 14:13:23 +00001626 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001627 if not self.monitor or not self.monitor.has_process():
1628 return
1629
jadmanskif7fa2cc2008-10-01 14:13:23 +00001630 # build up sets of all the aborted_by and aborted_on values
1631 aborted_by, aborted_on = set(), set()
1632 for queue_entry in self.queue_entries:
1633 if queue_entry.aborted_by:
1634 aborted_by.add(queue_entry.aborted_by)
1635 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1636 aborted_on.add(t)
1637
1638 # extract some actual, unique aborted by value and write it out
1639 assert len(aborted_by) <= 1
1640 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001641 aborted_by_value = aborted_by.pop()
1642 aborted_on_value = max(aborted_on)
1643 else:
1644 aborted_by_value = 'autotest_system'
1645 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001646
showarda0382352009-02-11 23:36:43 +00001647 self._write_keyval_after_job("aborted_by", aborted_by_value)
1648 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001649
showardcbd74612008-11-19 21:42:02 +00001650 aborted_on_string = str(datetime.datetime.fromtimestamp(
1651 aborted_on_value))
1652 self._write_status_comment('Job aborted by %s on %s' %
1653 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001654
1655
jadmanski0afbb632008-06-06 21:10:57 +00001656 def abort(self):
1657 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001658 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001659 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001660
1661
jadmanski0afbb632008-06-06 21:10:57 +00001662 def epilog(self):
1663 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001664 self._finish_task()
1665 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001666
1667
mblighbb421852008-03-11 22:36:16 +00001668class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001669 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001670 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001671 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001672
1673
jadmanski0afbb632008-06-06 21:10:57 +00001674 def run(self):
1675 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001676
1677
jadmanski0afbb632008-06-06 21:10:57 +00001678 def prolog(self):
1679 # recovering an existing process - don't do prolog
1680 pass
mblighbb421852008-03-11 22:36:16 +00001681
1682
showardd3dc1992009-04-22 21:01:40 +00001683class PostJobTask(AgentTask):
1684 def __init__(self, queue_entries, pidfile_name, logfile_name,
1685 run_monitor=None):
1686 """
1687 If run_monitor != None, we're recovering a running task.
1688 """
1689 self._queue_entries = queue_entries
1690 self._pidfile_name = pidfile_name
1691 self._run_monitor = run_monitor
1692
1693 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1694 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1695 self._autoserv_monitor = PidfileRunMonitor()
1696 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1697 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1698
1699 if _testing_mode:
1700 command = 'true'
1701 else:
1702 command = self._generate_command(self._results_dir)
1703
1704 super(PostJobTask, self).__init__(cmd=command,
1705 working_directory=self._execution_tag)
1706
1707 self.log_file = os.path.join(self._execution_tag, logfile_name)
1708 self._final_status = self._determine_final_status()
1709
1710
1711 def _generate_command(self, results_dir):
1712 raise NotImplementedError('Subclasses must override this')
1713
1714
1715 def _job_was_aborted(self):
1716 was_aborted = None
1717 for queue_entry in self._queue_entries:
1718 queue_entry.update_from_database()
1719 if was_aborted is None: # first queue entry
1720 was_aborted = bool(queue_entry.aborted)
1721 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1722 email_manager.manager.enqueue_notify_email(
1723 'Inconsistent abort state',
1724 'Queue entries have inconsistent abort state: ' +
1725 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1726 # don't crash here, just assume true
1727 return True
1728 return was_aborted
1729
1730
1731 def _determine_final_status(self):
1732 if self._job_was_aborted():
1733 return models.HostQueueEntry.Status.ABORTED
1734
1735 # we'll use a PidfileRunMonitor to read the autoserv exit status
1736 if self._autoserv_monitor.exit_code() == 0:
1737 return models.HostQueueEntry.Status.COMPLETED
1738 return models.HostQueueEntry.Status.FAILED
1739
1740
1741 def run(self):
1742 if self._run_monitor is not None:
1743 self.monitor = self._run_monitor
1744 else:
1745 # make sure we actually have results to work with.
1746 # this should never happen in normal operation.
1747 if not self._autoserv_monitor.has_process():
1748 email_manager.manager.enqueue_notify_email(
1749 'No results in post-job task',
1750 'No results in post-job task at %s' %
1751 self._autoserv_monitor.pidfile_id)
1752 self.finished(False)
1753 return
1754
1755 super(PostJobTask, self).run(
1756 pidfile_name=self._pidfile_name,
1757 paired_with_pidfile=self._paired_with_pidfile)
1758
1759
1760 def _set_all_statuses(self, status):
1761 for queue_entry in self._queue_entries:
1762 queue_entry.set_status(status)
1763
1764
1765 def abort(self):
1766 # override AgentTask.abort() to avoid killing the process and ending
1767 # the task. post-job tasks continue when the job is aborted.
1768 pass
1769
1770
1771class GatherLogsTask(PostJobTask):
1772 """
1773 Task responsible for
1774 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1775 * copying logs to the results repository
1776 * spawning CleanupTasks for hosts, if necessary
1777 * spawning a FinalReparseTask for the job
1778 """
1779 def __init__(self, job, queue_entries, run_monitor=None):
1780 self._job = job
1781 super(GatherLogsTask, self).__init__(
1782 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1783 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1784 self._set_ids(queue_entries=queue_entries)
1785
1786
1787 def _generate_command(self, results_dir):
1788 host_list = ','.join(queue_entry.host.hostname
1789 for queue_entry in self._queue_entries)
1790 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1791 '-r', results_dir]
1792
1793
1794 def prolog(self):
1795 super(GatherLogsTask, self).prolog()
1796 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1797
1798
1799 def _reboot_hosts(self):
1800 reboot_after = self._job.reboot_after
1801 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001802 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1803 do_reboot = True
1804 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001805 do_reboot = True
1806 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1807 final_success = (
1808 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1809 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1810 do_reboot = (final_success and num_tests_failed == 0)
1811
1812 for queue_entry in self._queue_entries:
1813 if do_reboot:
1814 # don't pass the queue entry to the CleanupTask. if the cleanup
1815 # fails, the job doesn't care -- it's over.
1816 cleanup_task = CleanupTask(host=queue_entry.host)
1817 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1818 else:
1819 queue_entry.host.set_status('Ready')
1820
1821
1822 def epilog(self):
1823 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001824 if self._autoserv_monitor.has_process():
1825 self._copy_and_parse_results(self._queue_entries,
1826 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001827 self._reboot_hosts()
1828
1829
showard0bbfc212009-04-29 21:06:13 +00001830 def run(self):
showard597bfd32009-05-08 18:22:50 +00001831 autoserv_exit_code = self._autoserv_monitor.exit_code()
1832 # only run if Autoserv exited due to some signal. if we have no exit
1833 # code, assume something bad (and signal-like) happened.
1834 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001835 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001836 else:
1837 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001838
1839
showard8fe93b52008-11-18 17:53:22 +00001840class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001841 def __init__(self, host=None, queue_entry=None):
1842 assert bool(host) ^ bool(queue_entry)
1843 if queue_entry:
1844 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001845 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001846 self.host = host
showard170873e2009-01-07 00:22:26 +00001847
1848 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001849 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1850 ['--cleanup'],
1851 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001852 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001853 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1854 failure_tasks=[repair_task])
1855
1856 self._set_ids(host=host, queue_entries=[queue_entry])
1857 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001858
mblighd5c95802008-03-05 00:33:46 +00001859
jadmanski0afbb632008-06-06 21:10:57 +00001860 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001861 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001862 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001863 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001864
mblighd5c95802008-03-05 00:33:46 +00001865
showard21baa452008-10-21 00:08:39 +00001866 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001867 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001868 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001869 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001870 self.host.update_field('dirty', 0)
1871
1872
showardd3dc1992009-04-22 21:01:40 +00001873class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001874 _num_running_parses = 0
1875
showardd3dc1992009-04-22 21:01:40 +00001876 def __init__(self, queue_entries, run_monitor=None):
1877 super(FinalReparseTask, self).__init__(queue_entries,
1878 pidfile_name=_PARSER_PID_FILE,
1879 logfile_name='.parse.log',
1880 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001881 # don't use _set_ids, since we don't want to set the host_ids
1882 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001883 self._parse_started = False
1884
showard97aed502008-11-04 02:01:24 +00001885
1886 @classmethod
1887 def _increment_running_parses(cls):
1888 cls._num_running_parses += 1
1889
1890
1891 @classmethod
1892 def _decrement_running_parses(cls):
1893 cls._num_running_parses -= 1
1894
1895
1896 @classmethod
1897 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001898 return (cls._num_running_parses <
1899 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001900
1901
1902 def prolog(self):
1903 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001904 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001905
1906
1907 def epilog(self):
1908 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001909 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001910
1911
showardd3dc1992009-04-22 21:01:40 +00001912 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001913 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001914 results_dir]
showard97aed502008-11-04 02:01:24 +00001915
1916
showard08a36412009-05-05 01:01:13 +00001917 def tick(self):
1918 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001919 # and we can, at which point we revert to default behavior
1920 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001921 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001922 else:
1923 self._try_starting_parse()
1924
1925
1926 def run(self):
1927 # override run() to not actually run unless we can
1928 self._try_starting_parse()
1929
1930
1931 def _try_starting_parse(self):
1932 if not self._can_run_new_parse():
1933 return
showard170873e2009-01-07 00:22:26 +00001934
showard97aed502008-11-04 02:01:24 +00001935 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001936 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001937
showard97aed502008-11-04 02:01:24 +00001938 self._increment_running_parses()
1939 self._parse_started = True
1940
1941
1942 def finished(self, success):
1943 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001944 if self._parse_started:
1945 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001946
1947
showardc9ae1782009-01-30 01:42:37 +00001948class SetEntryPendingTask(AgentTask):
1949 def __init__(self, queue_entry):
1950 super(SetEntryPendingTask, self).__init__(cmd='')
1951 self._queue_entry = queue_entry
1952 self._set_ids(queue_entries=[queue_entry])
1953
1954
1955 def run(self):
1956 agent = self._queue_entry.on_pending()
1957 if agent:
1958 self.agent.dispatcher.add_agent(agent)
1959 self.finished(True)
1960
1961
showarda3c58572009-03-12 20:36:59 +00001962class DBError(Exception):
1963 """Raised by the DBObject constructor when its select fails."""
1964
1965
mbligh36768f02008-02-22 18:28:33 +00001966class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001967 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001968
1969 # Subclasses MUST override these:
1970 _table_name = ''
1971 _fields = ()
1972
showarda3c58572009-03-12 20:36:59 +00001973 # A mapping from (type, id) to the instance of the object for that
1974 # particular id. This prevents us from creating new Job() and Host()
1975 # instances for every HostQueueEntry object that we instantiate as
1976 # multiple HQEs often share the same Job.
1977 _instances_by_type_and_id = weakref.WeakValueDictionary()
1978 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001979
showarda3c58572009-03-12 20:36:59 +00001980
1981 def __new__(cls, id=None, **kwargs):
1982 """
1983 Look to see if we already have an instance for this particular type
1984 and id. If so, use it instead of creating a duplicate instance.
1985 """
1986 if id is not None:
1987 instance = cls._instances_by_type_and_id.get((cls, id))
1988 if instance:
1989 return instance
1990 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1991
1992
1993 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001994 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001995 assert self._table_name, '_table_name must be defined in your class'
1996 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001997 if not new_record:
1998 if self._initialized and not always_query:
1999 return # We've already been initialized.
2000 if id is None:
2001 id = row[0]
2002 # Tell future constructors to use us instead of re-querying while
2003 # this instance is still around.
2004 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002005
showard6ae5ea92009-02-25 00:11:51 +00002006 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002007
jadmanski0afbb632008-06-06 21:10:57 +00002008 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002009
jadmanski0afbb632008-06-06 21:10:57 +00002010 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002011 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002012
showarda3c58572009-03-12 20:36:59 +00002013 if self._initialized:
2014 differences = self._compare_fields_in_row(row)
2015 if differences:
showard7629f142009-03-27 21:02:02 +00002016 logging.warn(
2017 'initialized %s %s instance requery is updating: %s',
2018 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002019 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002020 self._initialized = True
2021
2022
2023 @classmethod
2024 def _clear_instance_cache(cls):
2025 """Used for testing, clear the internal instance cache."""
2026 cls._instances_by_type_and_id.clear()
2027
2028
showardccbd6c52009-03-21 00:10:21 +00002029 def _fetch_row_from_db(self, row_id):
2030 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2031 rows = _db.execute(sql, (row_id,))
2032 if not rows:
showard76e29d12009-04-15 21:53:10 +00002033 raise DBError("row not found (table=%s, row id=%s)"
2034 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002035 return rows[0]
2036
2037
showarda3c58572009-03-12 20:36:59 +00002038 def _assert_row_length(self, row):
2039 assert len(row) == len(self._fields), (
2040 "table = %s, row = %s/%d, fields = %s/%d" % (
2041 self.__table, row, len(row), self._fields, len(self._fields)))
2042
2043
2044 def _compare_fields_in_row(self, row):
2045 """
2046 Given a row as returned by a SELECT query, compare it to our existing
2047 in memory fields.
2048
2049 @param row - A sequence of values corresponding to fields named in
2050 The class attribute _fields.
2051
2052 @returns A dictionary listing the differences keyed by field name
2053 containing tuples of (current_value, row_value).
2054 """
2055 self._assert_row_length(row)
2056 differences = {}
2057 for field, row_value in itertools.izip(self._fields, row):
2058 current_value = getattr(self, field)
2059 if current_value != row_value:
2060 differences[field] = (current_value, row_value)
2061 return differences
showard2bab8f42008-11-12 18:15:22 +00002062
2063
2064 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002065 """
2066 Update our field attributes using a single row returned by SELECT.
2067
2068 @param row - A sequence of values corresponding to fields named in
2069 the class fields list.
2070 """
2071 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002072
showard2bab8f42008-11-12 18:15:22 +00002073 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002074 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002075 setattr(self, field, value)
2076 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002077
showard2bab8f42008-11-12 18:15:22 +00002078 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002079
mblighe2586682008-02-29 22:45:46 +00002080
showardccbd6c52009-03-21 00:10:21 +00002081 def update_from_database(self):
2082 assert self.id is not None
2083 row = self._fetch_row_from_db(self.id)
2084 self._update_fields_from_row(row)
2085
2086
jadmanski0afbb632008-06-06 21:10:57 +00002087 def count(self, where, table = None):
2088 if not table:
2089 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002090
jadmanski0afbb632008-06-06 21:10:57 +00002091 rows = _db.execute("""
2092 SELECT count(*) FROM %s
2093 WHERE %s
2094 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002095
jadmanski0afbb632008-06-06 21:10:57 +00002096 assert len(rows) == 1
2097
2098 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002099
2100
showardd3dc1992009-04-22 21:01:40 +00002101 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002102 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002103
showard2bab8f42008-11-12 18:15:22 +00002104 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002105 return
mbligh36768f02008-02-22 18:28:33 +00002106
mblighf8c624d2008-07-03 16:58:45 +00002107 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002108 _db.execute(query, (value, self.id))
2109
showard2bab8f42008-11-12 18:15:22 +00002110 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002111
2112
jadmanski0afbb632008-06-06 21:10:57 +00002113 def save(self):
2114 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002115 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002116 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002117 values = []
2118 for key in keys:
2119 value = getattr(self, key)
2120 if value is None:
2121 values.append('NULL')
2122 else:
2123 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002124 values_str = ','.join(values)
2125 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2126 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002127 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002128 # Update our id to the one the database just assigned to us.
2129 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002130
2131
jadmanski0afbb632008-06-06 21:10:57 +00002132 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002133 self._instances_by_type_and_id.pop((type(self), id), None)
2134 self._initialized = False
2135 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002136 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2137 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002138
2139
showard63a34772008-08-18 19:32:50 +00002140 @staticmethod
2141 def _prefix_with(string, prefix):
2142 if string:
2143 string = prefix + string
2144 return string
2145
2146
jadmanski0afbb632008-06-06 21:10:57 +00002147 @classmethod
showard989f25d2008-10-01 11:38:11 +00002148 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002149 """
2150 Construct instances of our class based on the given database query.
2151
2152 @yields One class instance for each row fetched.
2153 """
showard63a34772008-08-18 19:32:50 +00002154 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2155 where = cls._prefix_with(where, 'WHERE ')
2156 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002157 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002158 'joins' : joins,
2159 'where' : where,
2160 'order_by' : order_by})
2161 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002162 for row in rows:
2163 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002164
mbligh36768f02008-02-22 18:28:33 +00002165
2166class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002167 _table_name = 'ineligible_host_queues'
2168 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002169
2170
showard89f84db2009-03-12 20:39:13 +00002171class AtomicGroup(DBObject):
2172 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002173 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2174 'invalid')
showard89f84db2009-03-12 20:39:13 +00002175
2176
showard989f25d2008-10-01 11:38:11 +00002177class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002178 _table_name = 'labels'
2179 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002180 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002181
2182
mbligh36768f02008-02-22 18:28:33 +00002183class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002184 _table_name = 'hosts'
2185 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2186 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2187
2188
jadmanski0afbb632008-06-06 21:10:57 +00002189 def current_task(self):
2190 rows = _db.execute("""
2191 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2192 """, (self.id,))
2193
2194 if len(rows) == 0:
2195 return None
2196 else:
2197 assert len(rows) == 1
2198 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002199 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002200
2201
jadmanski0afbb632008-06-06 21:10:57 +00002202 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002203 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002204 if self.current_task():
2205 self.current_task().requeue()
2206
showard6ae5ea92009-02-25 00:11:51 +00002207
jadmanski0afbb632008-06-06 21:10:57 +00002208 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002209 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002210 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002211
2212
showard170873e2009-01-07 00:22:26 +00002213 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002214 """
showard170873e2009-01-07 00:22:26 +00002215 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002216 """
2217 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002218 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002219 FROM labels
2220 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002221 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002222 ORDER BY labels.name
2223 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002224 platform = None
2225 all_labels = []
2226 for label_name, is_platform in rows:
2227 if is_platform:
2228 platform = label_name
2229 all_labels.append(label_name)
2230 return platform, all_labels
2231
2232
2233 def reverify_tasks(self):
2234 cleanup_task = CleanupTask(host=self)
2235 verify_task = VerifyTask(host=self)
2236 # just to make sure this host does not get taken away
2237 self.set_status('Cleaning')
2238 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002239
2240
mbligh36768f02008-02-22 18:28:33 +00002241class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002242 _table_name = 'host_queue_entries'
2243 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002244 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002245 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002246
2247
showarda3c58572009-03-12 20:36:59 +00002248 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002249 assert id or row
showarda3c58572009-03-12 20:36:59 +00002250 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002251 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002252
jadmanski0afbb632008-06-06 21:10:57 +00002253 if self.host_id:
2254 self.host = Host(self.host_id)
2255 else:
2256 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002257
showard170873e2009-01-07 00:22:26 +00002258 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002259 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002260
2261
showard89f84db2009-03-12 20:39:13 +00002262 @classmethod
2263 def clone(cls, template):
2264 """
2265 Creates a new row using the values from a template instance.
2266
2267 The new instance will not exist in the database or have a valid
2268 id attribute until its save() method is called.
2269 """
2270 assert isinstance(template, cls)
2271 new_row = [getattr(template, field) for field in cls._fields]
2272 clone = cls(row=new_row, new_record=True)
2273 clone.id = None
2274 return clone
2275
2276
showardc85c21b2008-11-24 22:17:37 +00002277 def _view_job_url(self):
2278 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2279
2280
showardf1ae3542009-05-11 19:26:02 +00002281 def get_labels(self):
2282 """
2283 Get all labels associated with this host queue entry (either via the
2284 meta_host or as a job dependency label). The labels yielded are not
2285 guaranteed to be unique.
2286
2287 @yields Label instances associated with this host_queue_entry.
2288 """
2289 if self.meta_host:
2290 yield Label(id=self.meta_host, always_query=False)
2291 labels = Label.fetch(
2292 joins="JOIN jobs_dependency_labels AS deps "
2293 "ON (labels.id = deps.label_id)",
2294 where="deps.job_id = %d" % self.job.id)
2295 for label in labels:
2296 yield label
2297
2298
jadmanski0afbb632008-06-06 21:10:57 +00002299 def set_host(self, host):
2300 if host:
2301 self.queue_log_record('Assigning host ' + host.hostname)
2302 self.update_field('host_id', host.id)
2303 self.update_field('active', True)
2304 self.block_host(host.id)
2305 else:
2306 self.queue_log_record('Releasing host')
2307 self.unblock_host(self.host.id)
2308 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002309
jadmanski0afbb632008-06-06 21:10:57 +00002310 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002311
2312
jadmanski0afbb632008-06-06 21:10:57 +00002313 def get_host(self):
2314 return self.host
mbligh36768f02008-02-22 18:28:33 +00002315
2316
jadmanski0afbb632008-06-06 21:10:57 +00002317 def queue_log_record(self, log_line):
2318 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002319 _drone_manager.write_lines_to_file(self.queue_log_path,
2320 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002321
2322
jadmanski0afbb632008-06-06 21:10:57 +00002323 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002324 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002325 row = [0, self.job.id, host_id]
2326 block = IneligibleHostQueue(row=row, new_record=True)
2327 block.save()
mblighe2586682008-02-29 22:45:46 +00002328
2329
jadmanski0afbb632008-06-06 21:10:57 +00002330 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002331 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002332 blocks = IneligibleHostQueue.fetch(
2333 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2334 for block in blocks:
2335 block.delete()
mblighe2586682008-02-29 22:45:46 +00002336
2337
showard2bab8f42008-11-12 18:15:22 +00002338 def set_execution_subdir(self, subdir=None):
2339 if subdir is None:
2340 assert self.get_host()
2341 subdir = self.get_host().hostname
2342 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002343
2344
showard6355f6b2008-12-05 18:52:13 +00002345 def _get_hostname(self):
2346 if self.host:
2347 return self.host.hostname
2348 return 'no host'
2349
2350
showard170873e2009-01-07 00:22:26 +00002351 def __str__(self):
2352 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2353
2354
jadmanski0afbb632008-06-06 21:10:57 +00002355 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002356 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002357
showardb18134f2009-03-20 20:52:18 +00002358 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002359
showardc85c21b2008-11-24 22:17:37 +00002360 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002361 self.update_field('complete', False)
2362 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002363
jadmanski0afbb632008-06-06 21:10:57 +00002364 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002365 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002366 self.update_field('complete', False)
2367 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002368
showardc85c21b2008-11-24 22:17:37 +00002369 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002370 self.update_field('complete', True)
2371 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002372
2373 should_email_status = (status.lower() in _notify_email_statuses or
2374 'all' in _notify_email_statuses)
2375 if should_email_status:
2376 self._email_on_status(status)
2377
2378 self._email_on_job_complete()
2379
2380
2381 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002382 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002383
2384 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2385 self.job.id, self.job.name, hostname, status)
2386 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2387 self.job.id, self.job.name, hostname, status,
2388 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002389 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002390
2391
2392 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002393 if not self.job.is_finished():
2394 return
showard542e8402008-09-19 20:16:18 +00002395
showardc85c21b2008-11-24 22:17:37 +00002396 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002397 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002398 for queue_entry in hosts_queue:
2399 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002400 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002401 queue_entry.status))
2402
2403 summary_text = "\n".join(summary_text)
2404 status_counts = models.Job.objects.get_status_counts(
2405 [self.job.id])[self.job.id]
2406 status = ', '.join('%d %s' % (count, status) for status, count
2407 in status_counts.iteritems())
2408
2409 subject = 'Autotest: Job ID: %s "%s" %s' % (
2410 self.job.id, self.job.name, status)
2411 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2412 self.job.id, self.job.name, status, self._view_job_url(),
2413 summary_text)
showard170873e2009-01-07 00:22:26 +00002414 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002415
2416
showard89f84db2009-03-12 20:39:13 +00002417 def run(self, assigned_host=None):
2418 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002419 assert assigned_host
2420 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002421 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002422
showardb18134f2009-03-20 20:52:18 +00002423 logging.info("%s/%s/%s scheduled on %s, status=%s",
2424 self.job.name, self.meta_host, self.atomic_group_id,
2425 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002426
jadmanski0afbb632008-06-06 21:10:57 +00002427 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002428
showard6ae5ea92009-02-25 00:11:51 +00002429
jadmanski0afbb632008-06-06 21:10:57 +00002430 def requeue(self):
2431 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002432 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002433 # verify/cleanup failure sets the execution subdir, so reset it here
2434 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002435 if self.meta_host:
2436 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002437
2438
jadmanski0afbb632008-06-06 21:10:57 +00002439 def handle_host_failure(self):
2440 """\
2441 Called when this queue entry's host has failed verification and
2442 repair.
2443 """
2444 assert not self.meta_host
2445 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002446 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002447
2448
jadmanskif7fa2cc2008-10-01 14:13:23 +00002449 @property
2450 def aborted_by(self):
2451 self._load_abort_info()
2452 return self._aborted_by
2453
2454
2455 @property
2456 def aborted_on(self):
2457 self._load_abort_info()
2458 return self._aborted_on
2459
2460
2461 def _load_abort_info(self):
2462 """ Fetch info about who aborted the job. """
2463 if hasattr(self, "_aborted_by"):
2464 return
2465 rows = _db.execute("""
2466 SELECT users.login, aborted_host_queue_entries.aborted_on
2467 FROM aborted_host_queue_entries
2468 INNER JOIN users
2469 ON users.id = aborted_host_queue_entries.aborted_by_id
2470 WHERE aborted_host_queue_entries.queue_entry_id = %s
2471 """, (self.id,))
2472 if rows:
2473 self._aborted_by, self._aborted_on = rows[0]
2474 else:
2475 self._aborted_by = self._aborted_on = None
2476
2477
showardb2e2c322008-10-14 17:33:55 +00002478 def on_pending(self):
2479 """
2480 Called when an entry in a synchronous job has passed verify. If the
2481 job is ready to run, returns an agent to run the job. Returns None
2482 otherwise.
2483 """
2484 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002485 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002486 if self.job.is_ready():
2487 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002488 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002489 return None
2490
2491
showardd3dc1992009-04-22 21:01:40 +00002492 def abort(self, dispatcher):
2493 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002494
showardd3dc1992009-04-22 21:01:40 +00002495 Status = models.HostQueueEntry.Status
2496 has_running_job_agent = (
2497 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2498 and dispatcher.get_agents_for_entry(self))
2499 if has_running_job_agent:
2500 # do nothing; post-job tasks will finish and then mark this entry
2501 # with status "Aborted" and take care of the host
2502 return
2503
2504 if self.status in (Status.STARTING, Status.PENDING):
2505 self.host.set_status(models.Host.Status.READY)
2506 elif self.status == Status.VERIFYING:
2507 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2508
2509 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002510
2511 def execution_tag(self):
2512 assert self.execution_subdir
2513 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002514
2515
mbligh36768f02008-02-22 18:28:33 +00002516class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002517 _table_name = 'jobs'
2518 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2519 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002520 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002521 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002522
2523
showarda3c58572009-03-12 20:36:59 +00002524 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002525 assert id or row
showarda3c58572009-03-12 20:36:59 +00002526 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002527
mblighe2586682008-02-29 22:45:46 +00002528
jadmanski0afbb632008-06-06 21:10:57 +00002529 def is_server_job(self):
2530 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002531
2532
showard170873e2009-01-07 00:22:26 +00002533 def tag(self):
2534 return "%s-%s" % (self.id, self.owner)
2535
2536
jadmanski0afbb632008-06-06 21:10:57 +00002537 def get_host_queue_entries(self):
2538 rows = _db.execute("""
2539 SELECT * FROM host_queue_entries
2540 WHERE job_id= %s
2541 """, (self.id,))
2542 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002543
jadmanski0afbb632008-06-06 21:10:57 +00002544 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002545
jadmanski0afbb632008-06-06 21:10:57 +00002546 return entries
mbligh36768f02008-02-22 18:28:33 +00002547
2548
jadmanski0afbb632008-06-06 21:10:57 +00002549 def set_status(self, status, update_queues=False):
2550 self.update_field('status',status)
2551
2552 if update_queues:
2553 for queue_entry in self.get_host_queue_entries():
2554 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002555
2556
jadmanski0afbb632008-06-06 21:10:57 +00002557 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002558 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2559 status='Pending')
2560 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002561
2562
jadmanski0afbb632008-06-06 21:10:57 +00002563 def num_machines(self, clause = None):
2564 sql = "job_id=%s" % self.id
2565 if clause:
2566 sql += " AND (%s)" % clause
2567 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002568
2569
jadmanski0afbb632008-06-06 21:10:57 +00002570 def num_queued(self):
2571 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002572
2573
jadmanski0afbb632008-06-06 21:10:57 +00002574 def num_active(self):
2575 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002576
2577
jadmanski0afbb632008-06-06 21:10:57 +00002578 def num_complete(self):
2579 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002580
2581
jadmanski0afbb632008-06-06 21:10:57 +00002582 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002583 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002584
mbligh36768f02008-02-22 18:28:33 +00002585
showard6bb7c292009-01-30 01:44:51 +00002586 def _not_yet_run_entries(self, include_verifying=True):
2587 statuses = [models.HostQueueEntry.Status.QUEUED,
2588 models.HostQueueEntry.Status.PENDING]
2589 if include_verifying:
2590 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2591 return models.HostQueueEntry.objects.filter(job=self.id,
2592 status__in=statuses)
2593
2594
2595 def _stop_all_entries(self):
2596 entries_to_stop = self._not_yet_run_entries(
2597 include_verifying=False)
2598 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002599 assert not child_entry.complete, (
2600 '%s status=%s, active=%s, complete=%s' %
2601 (child_entry.id, child_entry.status, child_entry.active,
2602 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002603 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2604 child_entry.host.status = models.Host.Status.READY
2605 child_entry.host.save()
2606 child_entry.status = models.HostQueueEntry.Status.STOPPED
2607 child_entry.save()
2608
showard2bab8f42008-11-12 18:15:22 +00002609 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002610 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002611 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002612 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002613
2614
jadmanski0afbb632008-06-06 21:10:57 +00002615 def write_to_machines_file(self, queue_entry):
2616 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002617 file_path = os.path.join(self.tag(), '.machines')
2618 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002619
2620
showardf1ae3542009-05-11 19:26:02 +00002621 def _next_group_name(self, group_name=''):
2622 """@returns a directory name to use for the next host group results."""
2623 if group_name:
2624 # Sanitize for use as a pathname.
2625 group_name = group_name.replace(os.path.sep, '_')
2626 if group_name.startswith('.'):
2627 group_name = '_' + group_name[1:]
2628 # Add a separator between the group name and 'group%d'.
2629 group_name += '.'
2630 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002631 query = models.HostQueueEntry.objects.filter(
2632 job=self.id).values('execution_subdir').distinct()
2633 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002634 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2635 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002636 if ids:
2637 next_id = max(ids) + 1
2638 else:
2639 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002640 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002641
2642
showard170873e2009-01-07 00:22:26 +00002643 def _write_control_file(self, execution_tag):
2644 control_path = _drone_manager.attach_file_to_execution(
2645 execution_tag, self.control_file)
2646 return control_path
mbligh36768f02008-02-22 18:28:33 +00002647
showardb2e2c322008-10-14 17:33:55 +00002648
showard2bab8f42008-11-12 18:15:22 +00002649 def get_group_entries(self, queue_entry_from_group):
2650 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002651 return list(HostQueueEntry.fetch(
2652 where='job_id=%s AND execution_subdir=%s',
2653 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002654
2655
showardb2e2c322008-10-14 17:33:55 +00002656 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002657 assert queue_entries
2658 execution_tag = queue_entries[0].execution_tag()
2659 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002660 hostnames = ','.join([entry.get_host().hostname
2661 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002662
showard87ba02a2009-04-20 19:37:32 +00002663 params = _autoserv_command_line(
2664 hostnames, execution_tag,
2665 ['-P', execution_tag, '-n',
2666 _drone_manager.absolute_path(control_path)],
2667 job=self)
mbligh36768f02008-02-22 18:28:33 +00002668
jadmanski0afbb632008-06-06 21:10:57 +00002669 if not self.is_server_job():
2670 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002671
showardb2e2c322008-10-14 17:33:55 +00002672 return params
mblighe2586682008-02-29 22:45:46 +00002673
mbligh36768f02008-02-22 18:28:33 +00002674
showardc9ae1782009-01-30 01:42:37 +00002675 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002676 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002677 return True
showard0fc38302008-10-23 00:44:07 +00002678 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002679 return queue_entry.get_host().dirty
2680 return False
showard21baa452008-10-21 00:08:39 +00002681
showardc9ae1782009-01-30 01:42:37 +00002682
2683 def _should_run_verify(self, queue_entry):
2684 do_not_verify = (queue_entry.host.protection ==
2685 host_protections.Protection.DO_NOT_VERIFY)
2686 if do_not_verify:
2687 return False
2688 return self.run_verify
2689
2690
2691 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002692 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002693 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002694 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002695 if self._should_run_verify(queue_entry):
2696 tasks.append(VerifyTask(queue_entry=queue_entry))
2697 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002698 return tasks
2699
2700
showardf1ae3542009-05-11 19:26:02 +00002701 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002702 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002703 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002704 else:
showardf1ae3542009-05-11 19:26:02 +00002705 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002706 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002707 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002708 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002709
2710 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002711 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002712
2713
2714 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002715 """
2716 @returns A tuple containing a list of HostQueueEntry instances to be
2717 used to run this Job, a string group name to suggest giving
2718 to this job a results database.
2719 """
2720 if include_queue_entry.atomic_group_id:
2721 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2722 always_query=False)
2723 else:
2724 atomic_group = None
2725
showard2bab8f42008-11-12 18:15:22 +00002726 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002727 if atomic_group:
2728 num_entries_wanted = atomic_group.max_number_of_machines
2729 else:
2730 num_entries_wanted = self.synch_count
2731 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002732
showardf1ae3542009-05-11 19:26:02 +00002733 if num_entries_wanted > 0:
2734 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard2bab8f42008-11-12 18:15:22 +00002735 pending_entries = HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002736 where=where_clause,
2737 params=(self.id, include_queue_entry.id))
2738 # TODO(gps): sort these by hostname before slicing.
2739 chosen_entries += list(pending_entries)[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002740
showardf1ae3542009-05-11 19:26:02 +00002741 # Sanity check. We'll only ever be called if this can be met.
2742 assert len(chosen_entries) >= self.synch_count
2743
2744 if atomic_group:
2745 # Look at any meta_host and dependency labels and pick the first
2746 # one that also specifies this atomic group. Use that label name
2747 # as the group name if possible (it is more specific).
2748 group_name = atomic_group.name
2749 for label in include_queue_entry.get_labels():
2750 if label.atomic_group_id:
2751 assert label.atomic_group_id == atomic_group.id
2752 group_name = label.name
2753 break
2754 else:
2755 group_name = ''
2756
2757 self._assign_new_group(chosen_entries, group_name=group_name)
2758 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002759
2760
2761 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002762 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002763 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2764 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002765
showardf1ae3542009-05-11 19:26:02 +00002766 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2767 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002768
2769
showardf1ae3542009-05-11 19:26:02 +00002770 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002771 for queue_entry in queue_entries:
2772 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002773 params = self._get_autoserv_params(queue_entries)
2774 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002775 cmd=params, group_name=group_name)
2776 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002777 entry_ids = [entry.id for entry in queue_entries]
2778
showard170873e2009-01-07 00:22:26 +00002779 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002780
2781
mbligh36768f02008-02-22 18:28:33 +00002782if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002783 main()