blob: d48e63671314b7611ddc9f284cf4ce24139f0875 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000227 """
228 @returns The autoserv command line as a list of executable + parameters.
229
230 @param machines - string - A machine or comma separated list of machines
231 for the (-m) flag.
232 @param results_dir - string - Where the results will be written (-r).
233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
240 '-r', _drone_manager.absolute_path(results_dir)]
241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
245 return autoserv_argv + extra_args
246
247
showard89f84db2009-03-12 20:39:13 +0000248class SchedulerError(Exception):
249 """Raised by HostScheduler when an inconsistent state occurs."""
250
251
showard63a34772008-08-18 19:32:50 +0000252class HostScheduler(object):
253 def _get_ready_hosts(self):
254 # avoid any host with a currently active queue entry against it
255 hosts = Host.fetch(
256 joins='LEFT JOIN host_queue_entries AS active_hqe '
257 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000258 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000259 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000260 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000261 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
262 return dict((host.id, host) for host in hosts)
263
264
265 @staticmethod
266 def _get_sql_id_list(id_list):
267 return ','.join(str(item_id) for item_id in id_list)
268
269
270 @classmethod
showard989f25d2008-10-01 11:38:11 +0000271 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000272 if not id_list:
273 return {}
showard63a34772008-08-18 19:32:50 +0000274 query %= cls._get_sql_id_list(id_list)
275 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000276 return cls._process_many2many_dict(rows, flip)
277
278
279 @staticmethod
280 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000281 result = {}
282 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000283 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000284 if flip:
285 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000286 result.setdefault(left_id, set()).add(right_id)
287 return result
288
289
290 @classmethod
291 def _get_job_acl_groups(cls, job_ids):
292 query = """
showardd9ac4452009-02-07 02:04:37 +0000293 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000294 FROM jobs
295 INNER JOIN users ON users.login = jobs.owner
296 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
297 WHERE jobs.id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
303 def _get_job_ineligible_hosts(cls, job_ids):
304 query = """
305 SELECT job_id, host_id
306 FROM ineligible_host_queues
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard989f25d2008-10-01 11:38:11 +0000313 def _get_job_dependencies(cls, job_ids):
314 query = """
315 SELECT job_id, label_id
316 FROM jobs_dependency_labels
317 WHERE job_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, job_ids)
320
321
322 @classmethod
showard63a34772008-08-18 19:32:50 +0000323 def _get_host_acls(cls, host_ids):
324 query = """
showardd9ac4452009-02-07 02:04:37 +0000325 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000326 FROM acl_groups_hosts
327 WHERE host_id IN (%s)
328 """
329 return cls._get_many2many_dict(query, host_ids)
330
331
332 @classmethod
333 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000334 if not host_ids:
335 return {}, {}
showard63a34772008-08-18 19:32:50 +0000336 query = """
337 SELECT label_id, host_id
338 FROM hosts_labels
339 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000340 """ % cls._get_sql_id_list(host_ids)
341 rows = _db.execute(query)
342 labels_to_hosts = cls._process_many2many_dict(rows)
343 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
344 return labels_to_hosts, hosts_to_labels
345
346
347 @classmethod
348 def _get_labels(cls):
349 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000350
351
352 def refresh(self, pending_queue_entries):
353 self._hosts_available = self._get_ready_hosts()
354
355 relevant_jobs = [queue_entry.job_id
356 for queue_entry in pending_queue_entries]
357 self._job_acls = self._get_job_acl_groups(relevant_jobs)
358 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000359 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000360
361 host_ids = self._hosts_available.keys()
362 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000363 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
364
365 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000366
367
368 def _is_acl_accessible(self, host_id, queue_entry):
369 job_acls = self._job_acls.get(queue_entry.job_id, set())
370 host_acls = self._host_acls.get(host_id, set())
371 return len(host_acls.intersection(job_acls)) > 0
372
373
showard989f25d2008-10-01 11:38:11 +0000374 def _check_job_dependencies(self, job_dependencies, host_labels):
375 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000376 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000377
378
379 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
380 queue_entry):
showardade14e22009-01-26 22:38:32 +0000381 if not queue_entry.meta_host:
382 # bypass only_if_needed labels when a specific host is selected
383 return True
384
showard989f25d2008-10-01 11:38:11 +0000385 for label_id in host_labels:
386 label = self._labels[label_id]
387 if not label.only_if_needed:
388 # we don't care about non-only_if_needed labels
389 continue
390 if queue_entry.meta_host == label_id:
391 # if the label was requested in a metahost it's OK
392 continue
393 if label_id not in job_dependencies:
394 return False
395 return True
396
397
showard89f84db2009-03-12 20:39:13 +0000398 def _check_atomic_group_labels(self, host_labels, queue_entry):
399 """
400 Determine if the given HostQueueEntry's atomic group settings are okay
401 to schedule on a host with the given labels.
402
403 @param host_labels - A list of label ids that the host has.
404 @param queue_entry - The HostQueueEntry being considered for the host.
405
406 @returns True if atomic group settings are okay, False otherwise.
407 """
408 return (self._get_host_atomic_group_id(host_labels) ==
409 queue_entry.atomic_group_id)
410
411
412 def _get_host_atomic_group_id(self, host_labels):
413 """
414 Return the atomic group label id for a host with the given set of
415 labels if any, or None otherwise. Raises an exception if more than
416 one atomic group are found in the set of labels.
417
418 @param host_labels - A list of label ids that the host has.
419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
422 @raises SchedulerError - If more than one atomic group label is found.
423 """
424 atomic_ids = [self._labels[label_id].atomic_group_id
425 for label_id in host_labels
426 if self._labels[label_id].atomic_group_id is not None]
427 if not atomic_ids:
428 return None
429 if len(atomic_ids) > 1:
430 raise SchedulerError('More than one atomic label on host.')
431 return atomic_ids[0]
432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
447 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
463 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
464 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000465
showard89f84db2009-03-12 20:39:13 +0000466 return (self._is_acl_accessible(host_id, queue_entry) and
467 self._check_job_dependencies(job_dependencies, host_labels) and
468 self._check_only_if_needed_labels(
469 job_dependencies, host_labels, queue_entry) and
470 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000471
472
showard63a34772008-08-18 19:32:50 +0000473 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000474 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000475 return None
476 return self._hosts_available.pop(queue_entry.host_id, None)
477
478
479 def _is_host_usable(self, host_id):
480 if host_id not in self._hosts_available:
481 # host was already used during this scheduling cycle
482 return False
483 if self._hosts_available[host_id].invalid:
484 # Invalid hosts cannot be used for metahosts. They're included in
485 # the original query because they can be used by non-metahosts.
486 return False
487 return True
488
489
490 def _schedule_metahost(self, queue_entry):
491 label_id = queue_entry.meta_host
492 hosts_in_label = self._label_hosts.get(label_id, set())
493 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
494 set())
495
496 # must iterate over a copy so we can mutate the original while iterating
497 for host_id in list(hosts_in_label):
498 if not self._is_host_usable(host_id):
499 hosts_in_label.remove(host_id)
500 continue
501 if host_id in ineligible_host_ids:
502 continue
showard989f25d2008-10-01 11:38:11 +0000503 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000504 continue
505
showard89f84db2009-03-12 20:39:13 +0000506 # Remove the host from our cached internal state before returning
507 # the host object.
showard63a34772008-08-18 19:32:50 +0000508 hosts_in_label.remove(host_id)
509 return self._hosts_available.pop(host_id)
510 return None
511
512
513 def find_eligible_host(self, queue_entry):
514 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000515 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000516 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000517 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000518 return self._schedule_metahost(queue_entry)
519
520
showard89f84db2009-03-12 20:39:13 +0000521 def find_eligible_atomic_group(self, queue_entry):
522 """
523 Given an atomic group host queue entry, locate an appropriate group
524 of hosts for the associated job to run on.
525
526 The caller is responsible for creating new HQEs for the additional
527 hosts returned in order to run the actual job on them.
528
529 @returns A list of Host instances in a ready state to satisfy this
530 atomic group scheduling. Hosts will all belong to the same
531 atomic group label as specified by the queue_entry.
532 An empty list will be returned if no suitable atomic
533 group could be found.
534
535 TODO(gps): what is responsible for kicking off any attempted repairs on
536 a group of hosts? not this function, but something needs to. We do
537 not communicate that reason for returning [] outside of here...
538 For now, we'll just be unschedulable if enough hosts within one group
539 enter Repair Failed state.
540 """
541 assert queue_entry.atomic_group_id is not None
542 job = queue_entry.job
543 assert job.synch_count and job.synch_count > 0
544 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
545 if job.synch_count > atomic_group.max_number_of_machines:
546 # Such a Job and HostQueueEntry should never be possible to
547 # create using the frontend. Regardless, we can't process it.
548 # Abort it immediately and log an error on the scheduler.
549 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000550 logging.error(
551 'Error: job %d synch_count=%d > requested atomic_group %d '
552 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
553 job.id, job.synch_count, atomic_group.id,
554 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000555 return []
556 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
557 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
558 set())
559
560 # Look in each label associated with atomic_group until we find one with
561 # enough hosts to satisfy the job.
562 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
563 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
564 if queue_entry.meta_host is not None:
565 # If we have a metahost label, only allow its hosts.
566 group_hosts.intersection_update(hosts_in_label)
567 group_hosts -= ineligible_host_ids
568 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
569 group_hosts, queue_entry)
570
571 # Job.synch_count is treated as "minimum synch count" when
572 # scheduling for an atomic group of hosts. The atomic group
573 # number of machines is the maximum to pick out of a single
574 # atomic group label for scheduling at one time.
575 min_hosts = job.synch_count
576 max_hosts = atomic_group.max_number_of_machines
577
578 if len(eligible_hosts_in_group) < min_hosts:
579 # Not enough eligible hosts in this atomic group label.
580 continue
581
showardef519212009-05-08 02:29:53 +0000582 # So that they show up in a sane order when viewing the job.
583 eligible_hosts_in_group = sorted(eligible_hosts_in_group)
584
showard89f84db2009-03-12 20:39:13 +0000585 # Limit ourselves to scheduling the atomic group size.
586 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000587 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000588
589 # Remove the selected hosts from our cached internal state
590 # of available hosts in order to return the Host objects.
591 host_list = []
592 for host_id in eligible_hosts_in_group:
593 hosts_in_label.discard(host_id)
594 host_list.append(self._hosts_available.pop(host_id))
595 return host_list
596
597 return []
598
599
showard170873e2009-01-07 00:22:26 +0000600class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000601 def __init__(self):
602 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000603 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000604 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000605 user_cleanup_time = scheduler_config.config.clean_interval
606 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
607 _db, user_cleanup_time)
608 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000609 self._host_agents = {}
610 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000611
mbligh36768f02008-02-22 18:28:33 +0000612
showard915958d2009-04-22 21:00:58 +0000613 def initialize(self, recover_hosts=True):
614 self._periodic_cleanup.initialize()
615 self._24hr_upkeep.initialize()
616
jadmanski0afbb632008-06-06 21:10:57 +0000617 # always recover processes
618 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000619
jadmanski0afbb632008-06-06 21:10:57 +0000620 if recover_hosts:
621 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000622
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 def tick(self):
showard170873e2009-01-07 00:22:26 +0000625 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000626 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000627 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000628 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000629 self._schedule_new_jobs()
630 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000631 _drone_manager.execute_actions()
632 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000633
showard97aed502008-11-04 02:01:24 +0000634
mblighf3294cc2009-04-08 21:17:38 +0000635 def _run_cleanup(self):
636 self._periodic_cleanup.run_cleanup_maybe()
637 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000638
mbligh36768f02008-02-22 18:28:33 +0000639
showard170873e2009-01-07 00:22:26 +0000640 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
641 for object_id in object_ids:
642 agent_dict.setdefault(object_id, set()).add(agent)
643
644
645 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
646 for object_id in object_ids:
647 assert object_id in agent_dict
648 agent_dict[object_id].remove(agent)
649
650
jadmanski0afbb632008-06-06 21:10:57 +0000651 def add_agent(self, agent):
652 self._agents.append(agent)
653 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000654 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
655 self._register_agent_for_ids(self._queue_entry_agents,
656 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000657
showard170873e2009-01-07 00:22:26 +0000658
659 def get_agents_for_entry(self, queue_entry):
660 """
661 Find agents corresponding to the specified queue_entry.
662 """
showardd3dc1992009-04-22 21:01:40 +0000663 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000664
665
666 def host_has_agent(self, host):
667 """
668 Determine if there is currently an Agent present using this host.
669 """
670 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def remove_agent(self, agent):
674 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000675 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
676 agent)
677 self._unregister_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000682 self._register_pidfiles()
683 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000684 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000685 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000686 self._reverify_remaining_hosts()
687 # reinitialize drones after killing orphaned processes, since they can
688 # leave around files when they die
689 _drone_manager.execute_actions()
690 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000691
showard170873e2009-01-07 00:22:26 +0000692
693 def _register_pidfiles(self):
694 # during recovery we may need to read pidfiles for both running and
695 # parsing entries
696 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000697 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000698 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000699 for pidfile_name in _ALL_PIDFILE_NAMES:
700 pidfile_id = _drone_manager.get_pidfile_id_from(
701 queue_entry.execution_tag(), pidfile_name=pidfile_name)
702 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000703
704
showardd3dc1992009-04-22 21:01:40 +0000705 def _recover_entries_with_status(self, status, orphans, pidfile_name,
706 recover_entries_fn):
707 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000708 for queue_entry in queue_entries:
709 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000710 # synchronous job we've already recovered
711 continue
showardd3dc1992009-04-22 21:01:40 +0000712 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000713 execution_tag = queue_entry.execution_tag()
714 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000715 run_monitor.attach_to_existing_process(execution_tag,
716 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000717
718 log_message = ('Recovering %s entry %s ' %
719 (status.lower(),
720 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000721 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000722 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000723 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000724 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000725 continue
mbligh90a549d2008-03-25 23:52:34 +0000726
showard597bfd32009-05-08 18:22:50 +0000727 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000728 run_monitor.get_process())
729 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
730 orphans.discard(run_monitor.get_process())
731
732
733 def _kill_remaining_orphan_processes(self, orphans):
734 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000735 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000736 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000737
showard170873e2009-01-07 00:22:26 +0000738
showardd3dc1992009-04-22 21:01:40 +0000739 def _recover_running_entries(self, orphans):
740 def recover_entries(job, queue_entries, run_monitor):
741 if run_monitor is not None:
742 queue_task = RecoveryQueueTask(job=job,
743 queue_entries=queue_entries,
744 run_monitor=run_monitor)
745 self.add_agent(Agent(tasks=[queue_task],
746 num_processes=len(queue_entries)))
747 # else, _requeue_other_active_entries will cover this
748
749 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
750 orphans, '.autoserv_execute',
751 recover_entries)
752
753
754 def _recover_gathering_entries(self, orphans):
755 def recover_entries(job, queue_entries, run_monitor):
756 gather_task = GatherLogsTask(job, queue_entries,
757 run_monitor=run_monitor)
758 self.add_agent(Agent([gather_task]))
759
760 self._recover_entries_with_status(
761 models.HostQueueEntry.Status.GATHERING,
762 orphans, _CRASHINFO_PID_FILE, recover_entries)
763
764
765 def _recover_parsing_entries(self, orphans):
766 def recover_entries(job, queue_entries, run_monitor):
767 reparse_task = FinalReparseTask(queue_entries,
768 run_monitor=run_monitor)
769 self.add_agent(Agent([reparse_task], num_processes=0))
770
771 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
772 orphans, _PARSER_PID_FILE,
773 recover_entries)
774
775
776 def _recover_all_recoverable_entries(self):
777 orphans = _drone_manager.get_orphaned_autoserv_processes()
778 self._recover_running_entries(orphans)
779 self._recover_gathering_entries(orphans)
780 self._recover_parsing_entries(orphans)
781 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard97aed502008-11-04 02:01:24 +0000783
showard170873e2009-01-07 00:22:26 +0000784 def _requeue_other_active_entries(self):
785 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000786 where='active AND NOT complete AND '
787 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000788 for queue_entry in queue_entries:
789 if self.get_agents_for_entry(queue_entry):
790 # entry has already been recovered
791 continue
showardd3dc1992009-04-22 21:01:40 +0000792 if queue_entry.aborted:
793 queue_entry.abort(self)
794 continue
795
796 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000797 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000798 if queue_entry.host:
799 tasks = queue_entry.host.reverify_tasks()
800 self.add_agent(Agent(tasks))
801 agent = queue_entry.requeue()
802
803
804 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000805 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000806 self._reverify_hosts_where("""(status = 'Repairing' OR
807 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000808 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000809
showard170873e2009-01-07 00:22:26 +0000810 # recover "Running" hosts with no active queue entries, although this
811 # should never happen
812 message = ('Recovering running host %s - this probably indicates a '
813 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000814 self._reverify_hosts_where("""status = 'Running' AND
815 id NOT IN (SELECT host_id
816 FROM host_queue_entries
817 WHERE active)""",
818 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000819
820
jadmanski0afbb632008-06-06 21:10:57 +0000821 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000822 print_message='Reverifying host %s'):
823 full_where='locked = 0 AND invalid = 0 AND ' + where
824 for host in Host.fetch(where=full_where):
825 if self.host_has_agent(host):
826 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000827 continue
showard170873e2009-01-07 00:22:26 +0000828 if print_message:
showardb18134f2009-03-20 20:52:18 +0000829 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000830 tasks = host.reverify_tasks()
831 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000832
833
jadmanski0afbb632008-06-06 21:10:57 +0000834 def _recover_hosts(self):
835 # recover "Repair Failed" hosts
836 message = 'Reverifying dead host %s'
837 self._reverify_hosts_where("status = 'Repair Failed'",
838 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000839
840
showard04c82c52008-05-29 19:38:12 +0000841
showardb95b1bd2008-08-15 18:11:04 +0000842 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000843 # prioritize by job priority, then non-metahost over metahost, then FIFO
844 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000845 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000846 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000847 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000848
849
showard89f84db2009-03-12 20:39:13 +0000850 def _refresh_pending_queue_entries(self):
851 """
852 Lookup the pending HostQueueEntries and call our HostScheduler
853 refresh() method given that list. Return the list.
854
855 @returns A list of pending HostQueueEntries sorted in priority order.
856 """
showard63a34772008-08-18 19:32:50 +0000857 queue_entries = self._get_pending_queue_entries()
858 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000859 return []
showardb95b1bd2008-08-15 18:11:04 +0000860
showard63a34772008-08-18 19:32:50 +0000861 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000862
showard89f84db2009-03-12 20:39:13 +0000863 return queue_entries
864
865
866 def _schedule_atomic_group(self, queue_entry):
867 """
868 Schedule the given queue_entry on an atomic group of hosts.
869
870 Returns immediately if there are insufficient available hosts.
871
872 Creates new HostQueueEntries based off of queue_entry for the
873 scheduled hosts and starts them all running.
874 """
875 # This is a virtual host queue entry representing an entire
876 # atomic group, find a group and schedule their hosts.
877 group_hosts = self._host_scheduler.find_eligible_atomic_group(
878 queue_entry)
879 if not group_hosts:
880 return
881 # The first assigned host uses the original HostQueueEntry
882 group_queue_entries = [queue_entry]
883 for assigned_host in group_hosts[1:]:
884 # Create a new HQE for every additional assigned_host.
885 new_hqe = HostQueueEntry.clone(queue_entry)
886 new_hqe.save()
887 group_queue_entries.append(new_hqe)
888 assert len(group_queue_entries) == len(group_hosts)
889 for queue_entry, host in itertools.izip(group_queue_entries,
890 group_hosts):
891 self._run_queue_entry(queue_entry, host)
892
893
894 def _schedule_new_jobs(self):
895 queue_entries = self._refresh_pending_queue_entries()
896 if not queue_entries:
897 return
898
showard63a34772008-08-18 19:32:50 +0000899 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000900 if (queue_entry.atomic_group_id is None or
901 queue_entry.host_id is not None):
902 assigned_host = self._host_scheduler.find_eligible_host(
903 queue_entry)
904 if assigned_host:
905 self._run_queue_entry(queue_entry, assigned_host)
906 else:
907 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000908
909
910 def _run_queue_entry(self, queue_entry, host):
911 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000912 # in some cases (synchronous jobs with run_verify=False), agent may be
913 # None
showard9976ce92008-10-15 20:28:13 +0000914 if agent:
915 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000916
917
jadmanski0afbb632008-06-06 21:10:57 +0000918 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000919 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
920 for agent in self.get_agents_for_entry(entry):
921 agent.abort()
922 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000923
924
showard324bf812009-01-20 23:23:38 +0000925 def _can_start_agent(self, agent, num_started_this_cycle,
926 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000927 # always allow zero-process agents to run
928 if agent.num_processes == 0:
929 return True
930 # don't allow any nonzero-process agents to run after we've reached a
931 # limit (this avoids starvation of many-process agents)
932 if have_reached_limit:
933 return False
934 # total process throttling
showard324bf812009-01-20 23:23:38 +0000935 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000936 return False
937 # if a single agent exceeds the per-cycle throttling, still allow it to
938 # run when it's the first agent in the cycle
939 if num_started_this_cycle == 0:
940 return True
941 # per-cycle throttling
942 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000943 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000944 return False
945 return True
946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000949 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000950 have_reached_limit = False
951 # iterate over copy, so we can remove agents during iteration
952 for agent in list(self._agents):
953 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000954 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000955 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000956 continue
957 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000958 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000959 have_reached_limit):
960 have_reached_limit = True
961 continue
showard4c5374f2008-09-04 17:02:56 +0000962 num_started_this_cycle += agent.num_processes
963 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000964 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000965 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000966
967
showard29f7cd22009-04-29 21:16:24 +0000968 def _process_recurring_runs(self):
969 recurring_runs = models.RecurringRun.objects.filter(
970 start_date__lte=datetime.datetime.now())
971 for rrun in recurring_runs:
972 # Create job from template
973 job = rrun.job
974 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000975 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000976
977 host_objects = info['hosts']
978 one_time_hosts = info['one_time_hosts']
979 metahost_objects = info['meta_hosts']
980 dependencies = info['dependencies']
981 atomic_group = info['atomic_group']
982
983 for host in one_time_hosts or []:
984 this_host = models.Host.create_one_time_host(host.hostname)
985 host_objects.append(this_host)
986
987 try:
988 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000989 options=options,
showard29f7cd22009-04-29 21:16:24 +0000990 host_objects=host_objects,
991 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000992 atomic_group=atomic_group)
993
994 except Exception, ex:
995 logging.exception(ex)
996 #TODO send email
997
998 if rrun.loop_count == 1:
999 rrun.delete()
1000 else:
1001 if rrun.loop_count != 0: # if not infinite loop
1002 # calculate new start_date
1003 difference = datetime.timedelta(seconds=rrun.loop_period)
1004 rrun.start_date = rrun.start_date + difference
1005 rrun.loop_count -= 1
1006 rrun.save()
1007
1008
showard170873e2009-01-07 00:22:26 +00001009class PidfileRunMonitor(object):
1010 """
1011 Client must call either run() to start a new process or
1012 attach_to_existing_process().
1013 """
mbligh36768f02008-02-22 18:28:33 +00001014
showard170873e2009-01-07 00:22:26 +00001015 class _PidfileException(Exception):
1016 """
1017 Raised when there's some unexpected behavior with the pid file, but only
1018 used internally (never allowed to escape this class).
1019 """
mbligh36768f02008-02-22 18:28:33 +00001020
1021
showard170873e2009-01-07 00:22:26 +00001022 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001023 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001024 self._start_time = None
1025 self.pidfile_id = None
1026 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001027
1028
showard170873e2009-01-07 00:22:26 +00001029 def _add_nice_command(self, command, nice_level):
1030 if not nice_level:
1031 return command
1032 return ['nice', '-n', str(nice_level)] + command
1033
1034
1035 def _set_start_time(self):
1036 self._start_time = time.time()
1037
1038
1039 def run(self, command, working_directory, nice_level=None, log_file=None,
1040 pidfile_name=None, paired_with_pidfile=None):
1041 assert command is not None
1042 if nice_level is not None:
1043 command = ['nice', '-n', str(nice_level)] + command
1044 self._set_start_time()
1045 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001046 command, working_directory, pidfile_name=pidfile_name,
1047 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001048
1049
showardd3dc1992009-04-22 21:01:40 +00001050 def attach_to_existing_process(self, execution_tag,
1051 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001052 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001053 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1054 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001055 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001056
1057
jadmanski0afbb632008-06-06 21:10:57 +00001058 def kill(self):
showard170873e2009-01-07 00:22:26 +00001059 if self.has_process():
1060 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001061
mbligh36768f02008-02-22 18:28:33 +00001062
showard170873e2009-01-07 00:22:26 +00001063 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001064 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001065 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001066
1067
showard170873e2009-01-07 00:22:26 +00001068 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001069 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001070 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001071 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001072
1073
showard170873e2009-01-07 00:22:26 +00001074 def _read_pidfile(self, use_second_read=False):
1075 assert self.pidfile_id is not None, (
1076 'You must call run() or attach_to_existing_process()')
1077 contents = _drone_manager.get_pidfile_contents(
1078 self.pidfile_id, use_second_read=use_second_read)
1079 if contents.is_invalid():
1080 self._state = drone_manager.PidfileContents()
1081 raise self._PidfileException(contents)
1082 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001083
1084
showard21baa452008-10-21 00:08:39 +00001085 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001086 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1087 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001088 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001089 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001090 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001091
1092
1093 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001094 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001095 return
mblighbb421852008-03-11 22:36:16 +00001096
showard21baa452008-10-21 00:08:39 +00001097 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001098
showard170873e2009-01-07 00:22:26 +00001099 if self._state.process is None:
1100 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001101 return
mbligh90a549d2008-03-25 23:52:34 +00001102
showard21baa452008-10-21 00:08:39 +00001103 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001104 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001105 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001106 return
mbligh90a549d2008-03-25 23:52:34 +00001107
showard170873e2009-01-07 00:22:26 +00001108 # pid but no running process - maybe process *just* exited
1109 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001110 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001111 # autoserv exited without writing an exit code
1112 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001113 self._handle_pidfile_error(
1114 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001115
showard21baa452008-10-21 00:08:39 +00001116
1117 def _get_pidfile_info(self):
1118 """\
1119 After completion, self._state will contain:
1120 pid=None, exit_status=None if autoserv has not yet run
1121 pid!=None, exit_status=None if autoserv is running
1122 pid!=None, exit_status!=None if autoserv has completed
1123 """
1124 try:
1125 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001126 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001127 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001128
1129
showard170873e2009-01-07 00:22:26 +00001130 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001131 """\
1132 Called when no pidfile is found or no pid is in the pidfile.
1133 """
showard170873e2009-01-07 00:22:26 +00001134 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001135 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001136 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1137 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001138 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001139 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001140
1141
showard35162b02009-03-03 02:17:30 +00001142 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001143 """\
1144 Called when autoserv has exited without writing an exit status,
1145 or we've timed out waiting for autoserv to write a pid to the
1146 pidfile. In either case, we just return failure and the caller
1147 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001148
showard170873e2009-01-07 00:22:26 +00001149 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001150 """
1151 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001152 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001153 self._state.exit_status = 1
1154 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001155
1156
jadmanski0afbb632008-06-06 21:10:57 +00001157 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001158 self._get_pidfile_info()
1159 return self._state.exit_status
1160
1161
1162 def num_tests_failed(self):
1163 self._get_pidfile_info()
1164 assert self._state.num_tests_failed is not None
1165 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001166
1167
mbligh36768f02008-02-22 18:28:33 +00001168class Agent(object):
showard170873e2009-01-07 00:22:26 +00001169 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001170 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001171 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001172 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001173 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001174
showard170873e2009-01-07 00:22:26 +00001175 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1176 for task in tasks)
1177 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1178
showardd3dc1992009-04-22 21:01:40 +00001179 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001180 for task in tasks:
1181 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001182
1183
showardd3dc1992009-04-22 21:01:40 +00001184 def _clear_queue(self):
1185 self.queue = Queue.Queue(0)
1186
1187
showard170873e2009-01-07 00:22:26 +00001188 def _union_ids(self, id_lists):
1189 return set(itertools.chain(*id_lists))
1190
1191
jadmanski0afbb632008-06-06 21:10:57 +00001192 def add_task(self, task):
1193 self.queue.put_nowait(task)
1194 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001195
1196
jadmanski0afbb632008-06-06 21:10:57 +00001197 def tick(self):
showard21baa452008-10-21 00:08:39 +00001198 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001199 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001200 self.active_task.poll()
1201 if not self.active_task.is_done():
1202 return
1203 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001204
1205
jadmanski0afbb632008-06-06 21:10:57 +00001206 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001207 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001208 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001209 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001210 if not self.active_task.success:
1211 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001212 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001213
jadmanski0afbb632008-06-06 21:10:57 +00001214 if not self.is_done():
1215 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001219 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001220 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1221 # get reset.
1222 new_agent = Agent(self.active_task.failure_tasks)
1223 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001224
mblighe2586682008-02-29 22:45:46 +00001225
showard4c5374f2008-09-04 17:02:56 +00001226 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001227 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001231 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001232
1233
showardd3dc1992009-04-22 21:01:40 +00001234 def abort(self):
showard08a36412009-05-05 01:01:13 +00001235 # abort tasks until the queue is empty or a task ignores the abort
1236 while not self.is_done():
1237 if not self.active_task:
1238 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001239 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001240 if not self.active_task.aborted:
1241 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001242 return
1243 self.active_task = None
1244
showardd3dc1992009-04-22 21:01:40 +00001245
mbligh36768f02008-02-22 18:28:33 +00001246class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001247 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1248 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001249 self.done = False
1250 self.failure_tasks = failure_tasks
1251 self.started = False
1252 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001253 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001254 self.task = None
1255 self.agent = None
1256 self.monitor = None
1257 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001258 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001259 self.queue_entry_ids = []
1260 self.host_ids = []
1261 self.log_file = None
1262
1263
1264 def _set_ids(self, host=None, queue_entries=None):
1265 if queue_entries and queue_entries != [None]:
1266 self.host_ids = [entry.host.id for entry in queue_entries]
1267 self.queue_entry_ids = [entry.id for entry in queue_entries]
1268 else:
1269 assert host
1270 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001271
1272
jadmanski0afbb632008-06-06 21:10:57 +00001273 def poll(self):
showard08a36412009-05-05 01:01:13 +00001274 if not self.started:
1275 self.start()
1276 self.tick()
1277
1278
1279 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001280 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001281 exit_code = self.monitor.exit_code()
1282 if exit_code is None:
1283 return
1284 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001285 else:
1286 success = False
mbligh36768f02008-02-22 18:28:33 +00001287
jadmanski0afbb632008-06-06 21:10:57 +00001288 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001289
1290
jadmanski0afbb632008-06-06 21:10:57 +00001291 def is_done(self):
1292 return self.done
mbligh36768f02008-02-22 18:28:33 +00001293
1294
jadmanski0afbb632008-06-06 21:10:57 +00001295 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001296 if self.done:
1297 return
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.done = True
1299 self.success = success
1300 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001301
1302
jadmanski0afbb632008-06-06 21:10:57 +00001303 def prolog(self):
1304 pass
mblighd64e5702008-04-04 21:39:28 +00001305
1306
jadmanski0afbb632008-06-06 21:10:57 +00001307 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001308 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001309
mbligh36768f02008-02-22 18:28:33 +00001310
jadmanski0afbb632008-06-06 21:10:57 +00001311 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001312 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001313 _drone_manager.copy_to_results_repository(
1314 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def epilog(self):
1318 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001319
1320
jadmanski0afbb632008-06-06 21:10:57 +00001321 def start(self):
1322 assert self.agent
1323
1324 if not self.started:
1325 self.prolog()
1326 self.run()
1327
1328 self.started = True
1329
1330
1331 def abort(self):
1332 if self.monitor:
1333 self.monitor.kill()
1334 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001335 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001336 self.cleanup()
1337
1338
showard170873e2009-01-07 00:22:26 +00001339 def set_host_log_file(self, base_name, host):
1340 filename = '%s.%s' % (time.time(), base_name)
1341 self.log_file = os.path.join('hosts', host.hostname, filename)
1342
1343
showardde634ee2009-01-30 01:44:24 +00001344 def _get_consistent_execution_tag(self, queue_entries):
1345 first_execution_tag = queue_entries[0].execution_tag()
1346 for queue_entry in queue_entries[1:]:
1347 assert queue_entry.execution_tag() == first_execution_tag, (
1348 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1349 queue_entry,
1350 first_execution_tag,
1351 queue_entries[0]))
1352 return first_execution_tag
1353
1354
showarda1e74b32009-05-12 17:32:04 +00001355 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001356 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001357 if use_monitor is None:
1358 assert self.monitor
1359 use_monitor = self.monitor
1360 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001361 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001362 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001363 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001364 results_path)
showardde634ee2009-01-30 01:44:24 +00001365
showarda1e74b32009-05-12 17:32:04 +00001366
1367 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001368 reparse_task = FinalReparseTask(queue_entries)
1369 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1370
1371
showarda1e74b32009-05-12 17:32:04 +00001372 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1373 self._copy_results(queue_entries, use_monitor)
1374 self._parse_results(queue_entries)
1375
1376
showardd3dc1992009-04-22 21:01:40 +00001377 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001378 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001379 self.monitor = PidfileRunMonitor()
1380 self.monitor.run(self.cmd, self._working_directory,
1381 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001382 log_file=self.log_file,
1383 pidfile_name=pidfile_name,
1384 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001385
1386
showardd9205182009-04-27 20:09:55 +00001387class TaskWithJobKeyvals(object):
1388 """AgentTask mixin providing functionality to help with job keyval files."""
1389 _KEYVAL_FILE = 'keyval'
1390 def _format_keyval(self, key, value):
1391 return '%s=%s' % (key, value)
1392
1393
1394 def _keyval_path(self):
1395 """Subclasses must override this"""
1396 raise NotImplemented
1397
1398
1399 def _write_keyval_after_job(self, field, value):
1400 assert self.monitor
1401 if not self.monitor.has_process():
1402 return
1403 _drone_manager.write_lines_to_file(
1404 self._keyval_path(), [self._format_keyval(field, value)],
1405 paired_with_process=self.monitor.get_process())
1406
1407
1408 def _job_queued_keyval(self, job):
1409 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1410
1411
1412 def _write_job_finished(self):
1413 self._write_keyval_after_job("job_finished", int(time.time()))
1414
1415
1416class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001417 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001418 """\
showard170873e2009-01-07 00:22:26 +00001419 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001420 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001421 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001422 # normalize the protection name
1423 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001424
jadmanski0afbb632008-06-06 21:10:57 +00001425 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001426 self.queue_entry_to_fail = queue_entry
1427 # *don't* include the queue entry in IDs -- if the queue entry is
1428 # aborted, we want to leave the repair task running
1429 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001430
1431 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001432 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1433 ['-R', '--host-protection', protection],
1434 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001435 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1436
showard170873e2009-01-07 00:22:26 +00001437 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001438
mbligh36768f02008-02-22 18:28:33 +00001439
jadmanski0afbb632008-06-06 21:10:57 +00001440 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001441 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001442 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001443 if self.queue_entry_to_fail:
1444 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001445
1446
showardd9205182009-04-27 20:09:55 +00001447 def _keyval_path(self):
1448 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1449
1450
showardde634ee2009-01-30 01:44:24 +00001451 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001452 assert self.queue_entry_to_fail
1453
1454 if self.queue_entry_to_fail.meta_host:
1455 return # don't fail metahost entries, they'll be reassigned
1456
1457 self.queue_entry_to_fail.update_from_database()
1458 if self.queue_entry_to_fail.status != 'Queued':
1459 return # entry has been aborted
1460
1461 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001462 queued_key, queued_time = self._job_queued_keyval(
1463 self.queue_entry_to_fail.job)
1464 self._write_keyval_after_job(queued_key, queued_time)
1465 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001466 # copy results logs into the normal place for job results
1467 _drone_manager.copy_results_on_drone(
1468 self.monitor.get_process(),
1469 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001470 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001471
showarda1e74b32009-05-12 17:32:04 +00001472 self._copy_results([self.queue_entry_to_fail])
1473 if self.queue_entry_to_fail.job.parse_failed_repair:
1474 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001475 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001476
1477
jadmanski0afbb632008-06-06 21:10:57 +00001478 def epilog(self):
1479 super(RepairTask, self).epilog()
1480 if self.success:
1481 self.host.set_status('Ready')
1482 else:
1483 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001484 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001485 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001486
1487
showard8fe93b52008-11-18 17:53:22 +00001488class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001489 def epilog(self):
1490 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001491 should_copy_results = (self.queue_entry and not self.success
1492 and not self.queue_entry.meta_host)
1493 if should_copy_results:
1494 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001495 destination = os.path.join(self.queue_entry.execution_tag(),
1496 os.path.basename(self.log_file))
1497 _drone_manager.copy_to_results_repository(
1498 self.monitor.get_process(), self.log_file,
1499 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001500
1501
1502class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001503 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001504 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001505 self.host = host or queue_entry.host
1506 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001507
jadmanski0afbb632008-06-06 21:10:57 +00001508 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001509 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1510 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001511 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001512 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1513 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001514
showard170873e2009-01-07 00:22:26 +00001515 self.set_host_log_file('verify', self.host)
1516 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001517
1518
jadmanski0afbb632008-06-06 21:10:57 +00001519 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001520 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001521 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001522 if self.queue_entry:
1523 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001524 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def epilog(self):
1528 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001529
jadmanski0afbb632008-06-06 21:10:57 +00001530 if self.success:
1531 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001532
1533
showardd9205182009-04-27 20:09:55 +00001534class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001535 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001536 self.job = job
1537 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001538 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001539 super(QueueTask, self).__init__(cmd, self._execution_tag())
1540 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001541
1542
showard73ec0442009-02-07 02:05:20 +00001543 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001544 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001545
1546
1547 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1548 keyval_contents = '\n'.join(self._format_keyval(key, value)
1549 for key, value in keyval_dict.iteritems())
1550 # always end with a newline to allow additional keyvals to be written
1551 keyval_contents += '\n'
1552 _drone_manager.attach_file_to_execution(self._execution_tag(),
1553 keyval_contents,
1554 file_path=keyval_path)
1555
1556
1557 def _write_keyvals_before_job(self, keyval_dict):
1558 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1559
1560
showard170873e2009-01-07 00:22:26 +00001561 def _write_host_keyvals(self, host):
1562 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1563 host.hostname)
1564 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001565 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1566 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001567
1568
showard170873e2009-01-07 00:22:26 +00001569 def _execution_tag(self):
1570 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001571
1572
jadmanski0afbb632008-06-06 21:10:57 +00001573 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001574 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001575 keyval_dict = {queued_key: queued_time}
1576 if self.group_name:
1577 keyval_dict['host_group_name'] = self.group_name
1578 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001579 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001580 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001581 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001582 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001583 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001584 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001585 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001586 assert len(self.queue_entries) == 1
1587 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001588
1589
showard35162b02009-03-03 02:17:30 +00001590 def _write_lost_process_error_file(self):
1591 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1592 _drone_manager.write_lines_to_file(error_file_path,
1593 [_LOST_PROCESS_ERROR])
1594
1595
showardd3dc1992009-04-22 21:01:40 +00001596 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001597 if not self.monitor:
1598 return
1599
showardd9205182009-04-27 20:09:55 +00001600 self._write_job_finished()
1601
showardd3dc1992009-04-22 21:01:40 +00001602 # both of these conditionals can be true, iff the process ran, wrote a
1603 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001604 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001605 gather_task = GatherLogsTask(self.job, self.queue_entries)
1606 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001607
1608 if self.monitor.lost_process:
1609 self._write_lost_process_error_file()
1610 for queue_entry in self.queue_entries:
1611 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001612
1613
showardcbd74612008-11-19 21:42:02 +00001614 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001615 _drone_manager.write_lines_to_file(
1616 os.path.join(self._execution_tag(), 'status.log'),
1617 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001618 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001619
1620
jadmanskif7fa2cc2008-10-01 14:13:23 +00001621 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001622 if not self.monitor or not self.monitor.has_process():
1623 return
1624
jadmanskif7fa2cc2008-10-01 14:13:23 +00001625 # build up sets of all the aborted_by and aborted_on values
1626 aborted_by, aborted_on = set(), set()
1627 for queue_entry in self.queue_entries:
1628 if queue_entry.aborted_by:
1629 aborted_by.add(queue_entry.aborted_by)
1630 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1631 aborted_on.add(t)
1632
1633 # extract some actual, unique aborted by value and write it out
1634 assert len(aborted_by) <= 1
1635 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001636 aborted_by_value = aborted_by.pop()
1637 aborted_on_value = max(aborted_on)
1638 else:
1639 aborted_by_value = 'autotest_system'
1640 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001641
showarda0382352009-02-11 23:36:43 +00001642 self._write_keyval_after_job("aborted_by", aborted_by_value)
1643 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001644
showardcbd74612008-11-19 21:42:02 +00001645 aborted_on_string = str(datetime.datetime.fromtimestamp(
1646 aborted_on_value))
1647 self._write_status_comment('Job aborted by %s on %s' %
1648 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001649
1650
jadmanski0afbb632008-06-06 21:10:57 +00001651 def abort(self):
1652 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001653 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001654 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001655
1656
jadmanski0afbb632008-06-06 21:10:57 +00001657 def epilog(self):
1658 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001659 self._finish_task()
1660 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001661
1662
mblighbb421852008-03-11 22:36:16 +00001663class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001664 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001665 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001666 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001667
1668
jadmanski0afbb632008-06-06 21:10:57 +00001669 def run(self):
1670 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001671
1672
jadmanski0afbb632008-06-06 21:10:57 +00001673 def prolog(self):
1674 # recovering an existing process - don't do prolog
1675 pass
mblighbb421852008-03-11 22:36:16 +00001676
1677
showardd3dc1992009-04-22 21:01:40 +00001678class PostJobTask(AgentTask):
1679 def __init__(self, queue_entries, pidfile_name, logfile_name,
1680 run_monitor=None):
1681 """
1682 If run_monitor != None, we're recovering a running task.
1683 """
1684 self._queue_entries = queue_entries
1685 self._pidfile_name = pidfile_name
1686 self._run_monitor = run_monitor
1687
1688 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1689 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1690 self._autoserv_monitor = PidfileRunMonitor()
1691 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1692 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1693
1694 if _testing_mode:
1695 command = 'true'
1696 else:
1697 command = self._generate_command(self._results_dir)
1698
1699 super(PostJobTask, self).__init__(cmd=command,
1700 working_directory=self._execution_tag)
1701
1702 self.log_file = os.path.join(self._execution_tag, logfile_name)
1703 self._final_status = self._determine_final_status()
1704
1705
1706 def _generate_command(self, results_dir):
1707 raise NotImplementedError('Subclasses must override this')
1708
1709
1710 def _job_was_aborted(self):
1711 was_aborted = None
1712 for queue_entry in self._queue_entries:
1713 queue_entry.update_from_database()
1714 if was_aborted is None: # first queue entry
1715 was_aborted = bool(queue_entry.aborted)
1716 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1717 email_manager.manager.enqueue_notify_email(
1718 'Inconsistent abort state',
1719 'Queue entries have inconsistent abort state: ' +
1720 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1721 # don't crash here, just assume true
1722 return True
1723 return was_aborted
1724
1725
1726 def _determine_final_status(self):
1727 if self._job_was_aborted():
1728 return models.HostQueueEntry.Status.ABORTED
1729
1730 # we'll use a PidfileRunMonitor to read the autoserv exit status
1731 if self._autoserv_monitor.exit_code() == 0:
1732 return models.HostQueueEntry.Status.COMPLETED
1733 return models.HostQueueEntry.Status.FAILED
1734
1735
1736 def run(self):
1737 if self._run_monitor is not None:
1738 self.monitor = self._run_monitor
1739 else:
1740 # make sure we actually have results to work with.
1741 # this should never happen in normal operation.
1742 if not self._autoserv_monitor.has_process():
1743 email_manager.manager.enqueue_notify_email(
1744 'No results in post-job task',
1745 'No results in post-job task at %s' %
1746 self._autoserv_monitor.pidfile_id)
1747 self.finished(False)
1748 return
1749
1750 super(PostJobTask, self).run(
1751 pidfile_name=self._pidfile_name,
1752 paired_with_pidfile=self._paired_with_pidfile)
1753
1754
1755 def _set_all_statuses(self, status):
1756 for queue_entry in self._queue_entries:
1757 queue_entry.set_status(status)
1758
1759
1760 def abort(self):
1761 # override AgentTask.abort() to avoid killing the process and ending
1762 # the task. post-job tasks continue when the job is aborted.
1763 pass
1764
1765
1766class GatherLogsTask(PostJobTask):
1767 """
1768 Task responsible for
1769 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1770 * copying logs to the results repository
1771 * spawning CleanupTasks for hosts, if necessary
1772 * spawning a FinalReparseTask for the job
1773 """
1774 def __init__(self, job, queue_entries, run_monitor=None):
1775 self._job = job
1776 super(GatherLogsTask, self).__init__(
1777 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1778 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1779 self._set_ids(queue_entries=queue_entries)
1780
1781
1782 def _generate_command(self, results_dir):
1783 host_list = ','.join(queue_entry.host.hostname
1784 for queue_entry in self._queue_entries)
1785 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1786 '-r', results_dir]
1787
1788
1789 def prolog(self):
1790 super(GatherLogsTask, self).prolog()
1791 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1792
1793
1794 def _reboot_hosts(self):
1795 reboot_after = self._job.reboot_after
1796 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001797 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1798 do_reboot = True
1799 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001800 do_reboot = True
1801 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1802 final_success = (
1803 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1804 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1805 do_reboot = (final_success and num_tests_failed == 0)
1806
1807 for queue_entry in self._queue_entries:
1808 if do_reboot:
1809 # don't pass the queue entry to the CleanupTask. if the cleanup
1810 # fails, the job doesn't care -- it's over.
1811 cleanup_task = CleanupTask(host=queue_entry.host)
1812 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1813 else:
1814 queue_entry.host.set_status('Ready')
1815
1816
1817 def epilog(self):
1818 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001819 if self._autoserv_monitor.has_process():
1820 self._copy_and_parse_results(self._queue_entries,
1821 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001822 self._reboot_hosts()
1823
1824
showard0bbfc212009-04-29 21:06:13 +00001825 def run(self):
showard597bfd32009-05-08 18:22:50 +00001826 autoserv_exit_code = self._autoserv_monitor.exit_code()
1827 # only run if Autoserv exited due to some signal. if we have no exit
1828 # code, assume something bad (and signal-like) happened.
1829 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001830 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001831 else:
1832 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001833
1834
showard8fe93b52008-11-18 17:53:22 +00001835class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001836 def __init__(self, host=None, queue_entry=None):
1837 assert bool(host) ^ bool(queue_entry)
1838 if queue_entry:
1839 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001840 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001841 self.host = host
showard170873e2009-01-07 00:22:26 +00001842
1843 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001844 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1845 ['--cleanup'],
1846 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001847 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001848 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1849 failure_tasks=[repair_task])
1850
1851 self._set_ids(host=host, queue_entries=[queue_entry])
1852 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001853
mblighd5c95802008-03-05 00:33:46 +00001854
jadmanski0afbb632008-06-06 21:10:57 +00001855 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001856 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001857 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001858 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001859
mblighd5c95802008-03-05 00:33:46 +00001860
showard21baa452008-10-21 00:08:39 +00001861 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001862 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001863 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001864 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001865 self.host.update_field('dirty', 0)
1866
1867
showardd3dc1992009-04-22 21:01:40 +00001868class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001869 _num_running_parses = 0
1870
showardd3dc1992009-04-22 21:01:40 +00001871 def __init__(self, queue_entries, run_monitor=None):
1872 super(FinalReparseTask, self).__init__(queue_entries,
1873 pidfile_name=_PARSER_PID_FILE,
1874 logfile_name='.parse.log',
1875 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001876 # don't use _set_ids, since we don't want to set the host_ids
1877 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001878 self._parse_started = False
1879
showard97aed502008-11-04 02:01:24 +00001880
1881 @classmethod
1882 def _increment_running_parses(cls):
1883 cls._num_running_parses += 1
1884
1885
1886 @classmethod
1887 def _decrement_running_parses(cls):
1888 cls._num_running_parses -= 1
1889
1890
1891 @classmethod
1892 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001893 return (cls._num_running_parses <
1894 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001895
1896
1897 def prolog(self):
1898 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001899 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001900
1901
1902 def epilog(self):
1903 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001904 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001905
1906
showardd3dc1992009-04-22 21:01:40 +00001907 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001908 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001909 results_dir]
showard97aed502008-11-04 02:01:24 +00001910
1911
showard08a36412009-05-05 01:01:13 +00001912 def tick(self):
1913 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001914 # and we can, at which point we revert to default behavior
1915 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001916 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001917 else:
1918 self._try_starting_parse()
1919
1920
1921 def run(self):
1922 # override run() to not actually run unless we can
1923 self._try_starting_parse()
1924
1925
1926 def _try_starting_parse(self):
1927 if not self._can_run_new_parse():
1928 return
showard170873e2009-01-07 00:22:26 +00001929
showard97aed502008-11-04 02:01:24 +00001930 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001931 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001932
showard97aed502008-11-04 02:01:24 +00001933 self._increment_running_parses()
1934 self._parse_started = True
1935
1936
1937 def finished(self, success):
1938 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001939 if self._parse_started:
1940 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001941
1942
showardc9ae1782009-01-30 01:42:37 +00001943class SetEntryPendingTask(AgentTask):
1944 def __init__(self, queue_entry):
1945 super(SetEntryPendingTask, self).__init__(cmd='')
1946 self._queue_entry = queue_entry
1947 self._set_ids(queue_entries=[queue_entry])
1948
1949
1950 def run(self):
1951 agent = self._queue_entry.on_pending()
1952 if agent:
1953 self.agent.dispatcher.add_agent(agent)
1954 self.finished(True)
1955
1956
showarda3c58572009-03-12 20:36:59 +00001957class DBError(Exception):
1958 """Raised by the DBObject constructor when its select fails."""
1959
1960
mbligh36768f02008-02-22 18:28:33 +00001961class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001962 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001963
1964 # Subclasses MUST override these:
1965 _table_name = ''
1966 _fields = ()
1967
showarda3c58572009-03-12 20:36:59 +00001968 # A mapping from (type, id) to the instance of the object for that
1969 # particular id. This prevents us from creating new Job() and Host()
1970 # instances for every HostQueueEntry object that we instantiate as
1971 # multiple HQEs often share the same Job.
1972 _instances_by_type_and_id = weakref.WeakValueDictionary()
1973 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001974
showarda3c58572009-03-12 20:36:59 +00001975
1976 def __new__(cls, id=None, **kwargs):
1977 """
1978 Look to see if we already have an instance for this particular type
1979 and id. If so, use it instead of creating a duplicate instance.
1980 """
1981 if id is not None:
1982 instance = cls._instances_by_type_and_id.get((cls, id))
1983 if instance:
1984 return instance
1985 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1986
1987
1988 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001989 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001990 assert self._table_name, '_table_name must be defined in your class'
1991 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001992 if not new_record:
1993 if self._initialized and not always_query:
1994 return # We've already been initialized.
1995 if id is None:
1996 id = row[0]
1997 # Tell future constructors to use us instead of re-querying while
1998 # this instance is still around.
1999 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002000
showard6ae5ea92009-02-25 00:11:51 +00002001 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002002
jadmanski0afbb632008-06-06 21:10:57 +00002003 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002004
jadmanski0afbb632008-06-06 21:10:57 +00002005 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002006 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002007
showarda3c58572009-03-12 20:36:59 +00002008 if self._initialized:
2009 differences = self._compare_fields_in_row(row)
2010 if differences:
showard7629f142009-03-27 21:02:02 +00002011 logging.warn(
2012 'initialized %s %s instance requery is updating: %s',
2013 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002014 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002015 self._initialized = True
2016
2017
2018 @classmethod
2019 def _clear_instance_cache(cls):
2020 """Used for testing, clear the internal instance cache."""
2021 cls._instances_by_type_and_id.clear()
2022
2023
showardccbd6c52009-03-21 00:10:21 +00002024 def _fetch_row_from_db(self, row_id):
2025 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2026 rows = _db.execute(sql, (row_id,))
2027 if not rows:
showard76e29d12009-04-15 21:53:10 +00002028 raise DBError("row not found (table=%s, row id=%s)"
2029 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002030 return rows[0]
2031
2032
showarda3c58572009-03-12 20:36:59 +00002033 def _assert_row_length(self, row):
2034 assert len(row) == len(self._fields), (
2035 "table = %s, row = %s/%d, fields = %s/%d" % (
2036 self.__table, row, len(row), self._fields, len(self._fields)))
2037
2038
2039 def _compare_fields_in_row(self, row):
2040 """
2041 Given a row as returned by a SELECT query, compare it to our existing
2042 in memory fields.
2043
2044 @param row - A sequence of values corresponding to fields named in
2045 The class attribute _fields.
2046
2047 @returns A dictionary listing the differences keyed by field name
2048 containing tuples of (current_value, row_value).
2049 """
2050 self._assert_row_length(row)
2051 differences = {}
2052 for field, row_value in itertools.izip(self._fields, row):
2053 current_value = getattr(self, field)
2054 if current_value != row_value:
2055 differences[field] = (current_value, row_value)
2056 return differences
showard2bab8f42008-11-12 18:15:22 +00002057
2058
2059 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002060 """
2061 Update our field attributes using a single row returned by SELECT.
2062
2063 @param row - A sequence of values corresponding to fields named in
2064 the class fields list.
2065 """
2066 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002067
showard2bab8f42008-11-12 18:15:22 +00002068 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002069 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002070 setattr(self, field, value)
2071 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002072
showard2bab8f42008-11-12 18:15:22 +00002073 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002074
mblighe2586682008-02-29 22:45:46 +00002075
showardccbd6c52009-03-21 00:10:21 +00002076 def update_from_database(self):
2077 assert self.id is not None
2078 row = self._fetch_row_from_db(self.id)
2079 self._update_fields_from_row(row)
2080
2081
jadmanski0afbb632008-06-06 21:10:57 +00002082 def count(self, where, table = None):
2083 if not table:
2084 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002085
jadmanski0afbb632008-06-06 21:10:57 +00002086 rows = _db.execute("""
2087 SELECT count(*) FROM %s
2088 WHERE %s
2089 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002090
jadmanski0afbb632008-06-06 21:10:57 +00002091 assert len(rows) == 1
2092
2093 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002094
2095
showardd3dc1992009-04-22 21:01:40 +00002096 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002097 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002098
showard2bab8f42008-11-12 18:15:22 +00002099 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002100 return
mbligh36768f02008-02-22 18:28:33 +00002101
mblighf8c624d2008-07-03 16:58:45 +00002102 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002103 _db.execute(query, (value, self.id))
2104
showard2bab8f42008-11-12 18:15:22 +00002105 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002106
2107
jadmanski0afbb632008-06-06 21:10:57 +00002108 def save(self):
2109 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002110 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002111 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002112 values = []
2113 for key in keys:
2114 value = getattr(self, key)
2115 if value is None:
2116 values.append('NULL')
2117 else:
2118 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002119 values_str = ','.join(values)
2120 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2121 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002122 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002123 # Update our id to the one the database just assigned to us.
2124 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002125
2126
jadmanski0afbb632008-06-06 21:10:57 +00002127 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002128 self._instances_by_type_and_id.pop((type(self), id), None)
2129 self._initialized = False
2130 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002131 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2132 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002133
2134
showard63a34772008-08-18 19:32:50 +00002135 @staticmethod
2136 def _prefix_with(string, prefix):
2137 if string:
2138 string = prefix + string
2139 return string
2140
2141
jadmanski0afbb632008-06-06 21:10:57 +00002142 @classmethod
showard989f25d2008-10-01 11:38:11 +00002143 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002144 """
2145 Construct instances of our class based on the given database query.
2146
2147 @yields One class instance for each row fetched.
2148 """
showard63a34772008-08-18 19:32:50 +00002149 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2150 where = cls._prefix_with(where, 'WHERE ')
2151 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002152 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002153 'joins' : joins,
2154 'where' : where,
2155 'order_by' : order_by})
2156 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002157 for row in rows:
2158 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002159
mbligh36768f02008-02-22 18:28:33 +00002160
2161class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002162 _table_name = 'ineligible_host_queues'
2163 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002164
2165
showard89f84db2009-03-12 20:39:13 +00002166class AtomicGroup(DBObject):
2167 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002168 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2169 'invalid')
showard89f84db2009-03-12 20:39:13 +00002170
2171
showard989f25d2008-10-01 11:38:11 +00002172class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002173 _table_name = 'labels'
2174 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002175 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002176
2177
mbligh36768f02008-02-22 18:28:33 +00002178class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002179 _table_name = 'hosts'
2180 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2181 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2182
2183
jadmanski0afbb632008-06-06 21:10:57 +00002184 def current_task(self):
2185 rows = _db.execute("""
2186 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2187 """, (self.id,))
2188
2189 if len(rows) == 0:
2190 return None
2191 else:
2192 assert len(rows) == 1
2193 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002194 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002195
2196
jadmanski0afbb632008-06-06 21:10:57 +00002197 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002198 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002199 if self.current_task():
2200 self.current_task().requeue()
2201
showard6ae5ea92009-02-25 00:11:51 +00002202
jadmanski0afbb632008-06-06 21:10:57 +00002203 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002204 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002205 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002206
2207
showard170873e2009-01-07 00:22:26 +00002208 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002209 """
showard170873e2009-01-07 00:22:26 +00002210 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002211 """
2212 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002213 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002214 FROM labels
2215 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002216 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002217 ORDER BY labels.name
2218 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002219 platform = None
2220 all_labels = []
2221 for label_name, is_platform in rows:
2222 if is_platform:
2223 platform = label_name
2224 all_labels.append(label_name)
2225 return platform, all_labels
2226
2227
2228 def reverify_tasks(self):
2229 cleanup_task = CleanupTask(host=self)
2230 verify_task = VerifyTask(host=self)
2231 # just to make sure this host does not get taken away
2232 self.set_status('Cleaning')
2233 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002234
2235
mbligh36768f02008-02-22 18:28:33 +00002236class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002237 _table_name = 'host_queue_entries'
2238 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002239 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002240 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002241
2242
showarda3c58572009-03-12 20:36:59 +00002243 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002244 assert id or row
showarda3c58572009-03-12 20:36:59 +00002245 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002246 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002247
jadmanski0afbb632008-06-06 21:10:57 +00002248 if self.host_id:
2249 self.host = Host(self.host_id)
2250 else:
2251 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002252
showard170873e2009-01-07 00:22:26 +00002253 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002254 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002255
2256
showard89f84db2009-03-12 20:39:13 +00002257 @classmethod
2258 def clone(cls, template):
2259 """
2260 Creates a new row using the values from a template instance.
2261
2262 The new instance will not exist in the database or have a valid
2263 id attribute until its save() method is called.
2264 """
2265 assert isinstance(template, cls)
2266 new_row = [getattr(template, field) for field in cls._fields]
2267 clone = cls(row=new_row, new_record=True)
2268 clone.id = None
2269 return clone
2270
2271
showardc85c21b2008-11-24 22:17:37 +00002272 def _view_job_url(self):
2273 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2274
2275
showardf1ae3542009-05-11 19:26:02 +00002276 def get_labels(self):
2277 """
2278 Get all labels associated with this host queue entry (either via the
2279 meta_host or as a job dependency label). The labels yielded are not
2280 guaranteed to be unique.
2281
2282 @yields Label instances associated with this host_queue_entry.
2283 """
2284 if self.meta_host:
2285 yield Label(id=self.meta_host, always_query=False)
2286 labels = Label.fetch(
2287 joins="JOIN jobs_dependency_labels AS deps "
2288 "ON (labels.id = deps.label_id)",
2289 where="deps.job_id = %d" % self.job.id)
2290 for label in labels:
2291 yield label
2292
2293
jadmanski0afbb632008-06-06 21:10:57 +00002294 def set_host(self, host):
2295 if host:
2296 self.queue_log_record('Assigning host ' + host.hostname)
2297 self.update_field('host_id', host.id)
2298 self.update_field('active', True)
2299 self.block_host(host.id)
2300 else:
2301 self.queue_log_record('Releasing host')
2302 self.unblock_host(self.host.id)
2303 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002304
jadmanski0afbb632008-06-06 21:10:57 +00002305 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002306
2307
jadmanski0afbb632008-06-06 21:10:57 +00002308 def get_host(self):
2309 return self.host
mbligh36768f02008-02-22 18:28:33 +00002310
2311
jadmanski0afbb632008-06-06 21:10:57 +00002312 def queue_log_record(self, log_line):
2313 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002314 _drone_manager.write_lines_to_file(self.queue_log_path,
2315 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002316
2317
jadmanski0afbb632008-06-06 21:10:57 +00002318 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002319 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002320 row = [0, self.job.id, host_id]
2321 block = IneligibleHostQueue(row=row, new_record=True)
2322 block.save()
mblighe2586682008-02-29 22:45:46 +00002323
2324
jadmanski0afbb632008-06-06 21:10:57 +00002325 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002326 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002327 blocks = IneligibleHostQueue.fetch(
2328 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2329 for block in blocks:
2330 block.delete()
mblighe2586682008-02-29 22:45:46 +00002331
2332
showard2bab8f42008-11-12 18:15:22 +00002333 def set_execution_subdir(self, subdir=None):
2334 if subdir is None:
2335 assert self.get_host()
2336 subdir = self.get_host().hostname
2337 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002338
2339
showard6355f6b2008-12-05 18:52:13 +00002340 def _get_hostname(self):
2341 if self.host:
2342 return self.host.hostname
2343 return 'no host'
2344
2345
showard170873e2009-01-07 00:22:26 +00002346 def __str__(self):
2347 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2348
2349
jadmanski0afbb632008-06-06 21:10:57 +00002350 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002351 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002352
showardb18134f2009-03-20 20:52:18 +00002353 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002354
showardc85c21b2008-11-24 22:17:37 +00002355 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002356 self.update_field('complete', False)
2357 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002358
jadmanski0afbb632008-06-06 21:10:57 +00002359 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002360 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002361 self.update_field('complete', False)
2362 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002363
showardc85c21b2008-11-24 22:17:37 +00002364 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002365 self.update_field('complete', True)
2366 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002367
2368 should_email_status = (status.lower() in _notify_email_statuses or
2369 'all' in _notify_email_statuses)
2370 if should_email_status:
2371 self._email_on_status(status)
2372
2373 self._email_on_job_complete()
2374
2375
2376 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002377 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002378
2379 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2380 self.job.id, self.job.name, hostname, status)
2381 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2382 self.job.id, self.job.name, hostname, status,
2383 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002384 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002385
2386
2387 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002388 if not self.job.is_finished():
2389 return
showard542e8402008-09-19 20:16:18 +00002390
showardc85c21b2008-11-24 22:17:37 +00002391 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002392 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002393 for queue_entry in hosts_queue:
2394 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002395 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002396 queue_entry.status))
2397
2398 summary_text = "\n".join(summary_text)
2399 status_counts = models.Job.objects.get_status_counts(
2400 [self.job.id])[self.job.id]
2401 status = ', '.join('%d %s' % (count, status) for status, count
2402 in status_counts.iteritems())
2403
2404 subject = 'Autotest: Job ID: %s "%s" %s' % (
2405 self.job.id, self.job.name, status)
2406 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2407 self.job.id, self.job.name, status, self._view_job_url(),
2408 summary_text)
showard170873e2009-01-07 00:22:26 +00002409 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002410
2411
showard89f84db2009-03-12 20:39:13 +00002412 def run(self, assigned_host=None):
2413 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002414 assert assigned_host
2415 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002416 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002417
showardb18134f2009-03-20 20:52:18 +00002418 logging.info("%s/%s/%s scheduled on %s, status=%s",
2419 self.job.name, self.meta_host, self.atomic_group_id,
2420 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002421
jadmanski0afbb632008-06-06 21:10:57 +00002422 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002423
showard6ae5ea92009-02-25 00:11:51 +00002424
jadmanski0afbb632008-06-06 21:10:57 +00002425 def requeue(self):
2426 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002427 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002428 # verify/cleanup failure sets the execution subdir, so reset it here
2429 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002430 if self.meta_host:
2431 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002432
2433
jadmanski0afbb632008-06-06 21:10:57 +00002434 def handle_host_failure(self):
2435 """\
2436 Called when this queue entry's host has failed verification and
2437 repair.
2438 """
2439 assert not self.meta_host
2440 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002441 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002442
2443
jadmanskif7fa2cc2008-10-01 14:13:23 +00002444 @property
2445 def aborted_by(self):
2446 self._load_abort_info()
2447 return self._aborted_by
2448
2449
2450 @property
2451 def aborted_on(self):
2452 self._load_abort_info()
2453 return self._aborted_on
2454
2455
2456 def _load_abort_info(self):
2457 """ Fetch info about who aborted the job. """
2458 if hasattr(self, "_aborted_by"):
2459 return
2460 rows = _db.execute("""
2461 SELECT users.login, aborted_host_queue_entries.aborted_on
2462 FROM aborted_host_queue_entries
2463 INNER JOIN users
2464 ON users.id = aborted_host_queue_entries.aborted_by_id
2465 WHERE aborted_host_queue_entries.queue_entry_id = %s
2466 """, (self.id,))
2467 if rows:
2468 self._aborted_by, self._aborted_on = rows[0]
2469 else:
2470 self._aborted_by = self._aborted_on = None
2471
2472
showardb2e2c322008-10-14 17:33:55 +00002473 def on_pending(self):
2474 """
2475 Called when an entry in a synchronous job has passed verify. If the
2476 job is ready to run, returns an agent to run the job. Returns None
2477 otherwise.
2478 """
2479 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002480 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002481 if self.job.is_ready():
2482 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002483 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002484 return None
2485
2486
showardd3dc1992009-04-22 21:01:40 +00002487 def abort(self, dispatcher):
2488 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002489
showardd3dc1992009-04-22 21:01:40 +00002490 Status = models.HostQueueEntry.Status
2491 has_running_job_agent = (
2492 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2493 and dispatcher.get_agents_for_entry(self))
2494 if has_running_job_agent:
2495 # do nothing; post-job tasks will finish and then mark this entry
2496 # with status "Aborted" and take care of the host
2497 return
2498
2499 if self.status in (Status.STARTING, Status.PENDING):
2500 self.host.set_status(models.Host.Status.READY)
2501 elif self.status == Status.VERIFYING:
2502 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2503
2504 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002505
2506 def execution_tag(self):
2507 assert self.execution_subdir
2508 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002509
2510
mbligh36768f02008-02-22 18:28:33 +00002511class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002512 _table_name = 'jobs'
2513 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2514 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002515 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002516 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002517
2518
showarda3c58572009-03-12 20:36:59 +00002519 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002520 assert id or row
showarda3c58572009-03-12 20:36:59 +00002521 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002522
mblighe2586682008-02-29 22:45:46 +00002523
jadmanski0afbb632008-06-06 21:10:57 +00002524 def is_server_job(self):
2525 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002526
2527
showard170873e2009-01-07 00:22:26 +00002528 def tag(self):
2529 return "%s-%s" % (self.id, self.owner)
2530
2531
jadmanski0afbb632008-06-06 21:10:57 +00002532 def get_host_queue_entries(self):
2533 rows = _db.execute("""
2534 SELECT * FROM host_queue_entries
2535 WHERE job_id= %s
2536 """, (self.id,))
2537 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002538
jadmanski0afbb632008-06-06 21:10:57 +00002539 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002540
jadmanski0afbb632008-06-06 21:10:57 +00002541 return entries
mbligh36768f02008-02-22 18:28:33 +00002542
2543
jadmanski0afbb632008-06-06 21:10:57 +00002544 def set_status(self, status, update_queues=False):
2545 self.update_field('status',status)
2546
2547 if update_queues:
2548 for queue_entry in self.get_host_queue_entries():
2549 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002550
2551
jadmanski0afbb632008-06-06 21:10:57 +00002552 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002553 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2554 status='Pending')
2555 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002556
2557
jadmanski0afbb632008-06-06 21:10:57 +00002558 def num_machines(self, clause = None):
2559 sql = "job_id=%s" % self.id
2560 if clause:
2561 sql += " AND (%s)" % clause
2562 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002563
2564
jadmanski0afbb632008-06-06 21:10:57 +00002565 def num_queued(self):
2566 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002567
2568
jadmanski0afbb632008-06-06 21:10:57 +00002569 def num_active(self):
2570 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002571
2572
jadmanski0afbb632008-06-06 21:10:57 +00002573 def num_complete(self):
2574 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002575
2576
jadmanski0afbb632008-06-06 21:10:57 +00002577 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002578 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002579
mbligh36768f02008-02-22 18:28:33 +00002580
showard6bb7c292009-01-30 01:44:51 +00002581 def _not_yet_run_entries(self, include_verifying=True):
2582 statuses = [models.HostQueueEntry.Status.QUEUED,
2583 models.HostQueueEntry.Status.PENDING]
2584 if include_verifying:
2585 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2586 return models.HostQueueEntry.objects.filter(job=self.id,
2587 status__in=statuses)
2588
2589
2590 def _stop_all_entries(self):
2591 entries_to_stop = self._not_yet_run_entries(
2592 include_verifying=False)
2593 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002594 assert not child_entry.complete, (
2595 '%s status=%s, active=%s, complete=%s' %
2596 (child_entry.id, child_entry.status, child_entry.active,
2597 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002598 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2599 child_entry.host.status = models.Host.Status.READY
2600 child_entry.host.save()
2601 child_entry.status = models.HostQueueEntry.Status.STOPPED
2602 child_entry.save()
2603
showard2bab8f42008-11-12 18:15:22 +00002604 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002605 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002606 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002607 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002608
2609
jadmanski0afbb632008-06-06 21:10:57 +00002610 def write_to_machines_file(self, queue_entry):
2611 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002612 file_path = os.path.join(self.tag(), '.machines')
2613 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002614
2615
showardf1ae3542009-05-11 19:26:02 +00002616 def _next_group_name(self, group_name=''):
2617 """@returns a directory name to use for the next host group results."""
2618 if group_name:
2619 # Sanitize for use as a pathname.
2620 group_name = group_name.replace(os.path.sep, '_')
2621 if group_name.startswith('.'):
2622 group_name = '_' + group_name[1:]
2623 # Add a separator between the group name and 'group%d'.
2624 group_name += '.'
2625 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002626 query = models.HostQueueEntry.objects.filter(
2627 job=self.id).values('execution_subdir').distinct()
2628 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002629 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2630 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002631 if ids:
2632 next_id = max(ids) + 1
2633 else:
2634 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002635 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002636
2637
showard170873e2009-01-07 00:22:26 +00002638 def _write_control_file(self, execution_tag):
2639 control_path = _drone_manager.attach_file_to_execution(
2640 execution_tag, self.control_file)
2641 return control_path
mbligh36768f02008-02-22 18:28:33 +00002642
showardb2e2c322008-10-14 17:33:55 +00002643
showard2bab8f42008-11-12 18:15:22 +00002644 def get_group_entries(self, queue_entry_from_group):
2645 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002646 return list(HostQueueEntry.fetch(
2647 where='job_id=%s AND execution_subdir=%s',
2648 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002649
2650
showardb2e2c322008-10-14 17:33:55 +00002651 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002652 assert queue_entries
2653 execution_tag = queue_entries[0].execution_tag()
2654 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002655 hostnames = ','.join([entry.get_host().hostname
2656 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002657
showard87ba02a2009-04-20 19:37:32 +00002658 params = _autoserv_command_line(
2659 hostnames, execution_tag,
2660 ['-P', execution_tag, '-n',
2661 _drone_manager.absolute_path(control_path)],
2662 job=self)
mbligh36768f02008-02-22 18:28:33 +00002663
jadmanski0afbb632008-06-06 21:10:57 +00002664 if not self.is_server_job():
2665 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002666
showardb2e2c322008-10-14 17:33:55 +00002667 return params
mblighe2586682008-02-29 22:45:46 +00002668
mbligh36768f02008-02-22 18:28:33 +00002669
showardc9ae1782009-01-30 01:42:37 +00002670 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002671 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002672 return True
showard0fc38302008-10-23 00:44:07 +00002673 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002674 return queue_entry.get_host().dirty
2675 return False
showard21baa452008-10-21 00:08:39 +00002676
showardc9ae1782009-01-30 01:42:37 +00002677
2678 def _should_run_verify(self, queue_entry):
2679 do_not_verify = (queue_entry.host.protection ==
2680 host_protections.Protection.DO_NOT_VERIFY)
2681 if do_not_verify:
2682 return False
2683 return self.run_verify
2684
2685
2686 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002687 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002688 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002689 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002690 if self._should_run_verify(queue_entry):
2691 tasks.append(VerifyTask(queue_entry=queue_entry))
2692 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002693 return tasks
2694
2695
showardf1ae3542009-05-11 19:26:02 +00002696 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002697 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002698 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002699 else:
showardf1ae3542009-05-11 19:26:02 +00002700 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002701 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002702 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002703 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002704
2705 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002706 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002707
2708
2709 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002710 """
2711 @returns A tuple containing a list of HostQueueEntry instances to be
2712 used to run this Job, a string group name to suggest giving
2713 to this job a results database.
2714 """
2715 if include_queue_entry.atomic_group_id:
2716 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2717 always_query=False)
2718 else:
2719 atomic_group = None
2720
showard2bab8f42008-11-12 18:15:22 +00002721 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002722 if atomic_group:
2723 num_entries_wanted = atomic_group.max_number_of_machines
2724 else:
2725 num_entries_wanted = self.synch_count
2726 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002727
showardf1ae3542009-05-11 19:26:02 +00002728 if num_entries_wanted > 0:
2729 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard2bab8f42008-11-12 18:15:22 +00002730 pending_entries = HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002731 where=where_clause,
2732 params=(self.id, include_queue_entry.id))
2733 # TODO(gps): sort these by hostname before slicing.
2734 chosen_entries += list(pending_entries)[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002735
showardf1ae3542009-05-11 19:26:02 +00002736 # Sanity check. We'll only ever be called if this can be met.
2737 assert len(chosen_entries) >= self.synch_count
2738
2739 if atomic_group:
2740 # Look at any meta_host and dependency labels and pick the first
2741 # one that also specifies this atomic group. Use that label name
2742 # as the group name if possible (it is more specific).
2743 group_name = atomic_group.name
2744 for label in include_queue_entry.get_labels():
2745 if label.atomic_group_id:
2746 assert label.atomic_group_id == atomic_group.id
2747 group_name = label.name
2748 break
2749 else:
2750 group_name = ''
2751
2752 self._assign_new_group(chosen_entries, group_name=group_name)
2753 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002754
2755
2756 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002757 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002758 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2759 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002760
showardf1ae3542009-05-11 19:26:02 +00002761 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2762 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002763
2764
showardf1ae3542009-05-11 19:26:02 +00002765 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002766 for queue_entry in queue_entries:
2767 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002768 params = self._get_autoserv_params(queue_entries)
2769 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002770 cmd=params, group_name=group_name)
2771 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002772 entry_ids = [entry.id for entry in queue_entries]
2773
showard170873e2009-01-07 00:22:26 +00002774 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002775
2776
mbligh36768f02008-02-22 18:28:33 +00002777if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002778 main()