blob: 7a23ace1aa05122a005bc2b0603a9967e7ab4510 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
85 except:
86 logging.exception('Exception escaping in monitor_db')
87 raise
88
89
90def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000091 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000092
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
96 parser.add_option('--logfile', help='Set a log file that all stdout ' +
97 'should be redirected to. Stderr will go to this ' +
98 'file + ".err"')
99 parser.add_option('--test', help='Indicate that scheduler is under ' +
100 'test and should use dummy autoserv and no parsing',
101 action='store_true')
102 (options, args) = parser.parse_args()
103 if len(args) != 1:
104 parser.print_usage()
105 return
mbligh36768f02008-02-22 18:28:33 +0000106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showardc5afc462009-01-13 00:09:39 +0000155 init(options.logfile)
156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
172def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000173 global _shutdown
174 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000175 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000176
177
178def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000179 if logfile:
180 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
mblighfb676032009-04-01 18:25:38 +0000184 utils.write_pid("monitor_db")
185
showardb1e51872008-10-07 11:08:18 +0000186 if _testing_mode:
187 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000188 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000189
jadmanski0afbb632008-06-06 21:10:57 +0000190 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
191 global _db
showard170873e2009-01-07 00:22:26 +0000192 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000193 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000194
showardfa8629c2008-11-04 16:51:23 +0000195 # ensure Django connection is in autocommit
196 setup_django_environment.enable_autocommit()
197
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000199 signal.signal(signal.SIGINT, handle_sigint)
200
showardd1ee1dd2009-01-07 21:33:08 +0000201 drones = global_config.global_config.get_config_value(
202 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
203 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000204 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000205 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000206 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
207
showardb18134f2009-03-20 20:52:18 +0000208 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000209
210
211def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000212 out_file = logfile
213 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000215 out_fd = open(out_file, "a", buffering=0)
216 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000217
jadmanski0afbb632008-06-06 21:10:57 +0000218 os.dup2(out_fd.fileno(), sys.stdout.fileno())
219 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000220
jadmanski0afbb632008-06-06 21:10:57 +0000221 sys.stdout = out_fd
222 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000223
224
showard87ba02a2009-04-20 19:37:32 +0000225def _autoserv_command_line(machines, results_dir, extra_args, job=None,
226 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000227 """
228 @returns The autoserv command line as a list of executable + parameters.
229
230 @param machines - string - A machine or comma separated list of machines
231 for the (-m) flag.
232 @param results_dir - string - Where the results will be written (-r).
233 @param extra_args - list - Additional arguments to pass to autoserv.
234 @param job - Job object - If supplied, -u owner and -l name parameters
235 will be added.
236 @param queue_entry - A HostQueueEntry object - If supplied and no Job
237 object was supplied, this will be used to lookup the Job object.
238 """
showard87ba02a2009-04-20 19:37:32 +0000239 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
240 '-r', _drone_manager.absolute_path(results_dir)]
241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
245 return autoserv_argv + extra_args
246
247
showard89f84db2009-03-12 20:39:13 +0000248class SchedulerError(Exception):
249 """Raised by HostScheduler when an inconsistent state occurs."""
250
251
showard63a34772008-08-18 19:32:50 +0000252class HostScheduler(object):
253 def _get_ready_hosts(self):
254 # avoid any host with a currently active queue entry against it
255 hosts = Host.fetch(
256 joins='LEFT JOIN host_queue_entries AS active_hqe '
257 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000258 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000259 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000260 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000261 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
262 return dict((host.id, host) for host in hosts)
263
264
265 @staticmethod
266 def _get_sql_id_list(id_list):
267 return ','.join(str(item_id) for item_id in id_list)
268
269
270 @classmethod
showard989f25d2008-10-01 11:38:11 +0000271 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000272 if not id_list:
273 return {}
showard63a34772008-08-18 19:32:50 +0000274 query %= cls._get_sql_id_list(id_list)
275 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000276 return cls._process_many2many_dict(rows, flip)
277
278
279 @staticmethod
280 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000281 result = {}
282 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000283 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000284 if flip:
285 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000286 result.setdefault(left_id, set()).add(right_id)
287 return result
288
289
290 @classmethod
291 def _get_job_acl_groups(cls, job_ids):
292 query = """
showardd9ac4452009-02-07 02:04:37 +0000293 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000294 FROM jobs
295 INNER JOIN users ON users.login = jobs.owner
296 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
297 WHERE jobs.id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
303 def _get_job_ineligible_hosts(cls, job_ids):
304 query = """
305 SELECT job_id, host_id
306 FROM ineligible_host_queues
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard989f25d2008-10-01 11:38:11 +0000313 def _get_job_dependencies(cls, job_ids):
314 query = """
315 SELECT job_id, label_id
316 FROM jobs_dependency_labels
317 WHERE job_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, job_ids)
320
321
322 @classmethod
showard63a34772008-08-18 19:32:50 +0000323 def _get_host_acls(cls, host_ids):
324 query = """
showardd9ac4452009-02-07 02:04:37 +0000325 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000326 FROM acl_groups_hosts
327 WHERE host_id IN (%s)
328 """
329 return cls._get_many2many_dict(query, host_ids)
330
331
332 @classmethod
333 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000334 if not host_ids:
335 return {}, {}
showard63a34772008-08-18 19:32:50 +0000336 query = """
337 SELECT label_id, host_id
338 FROM hosts_labels
339 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000340 """ % cls._get_sql_id_list(host_ids)
341 rows = _db.execute(query)
342 labels_to_hosts = cls._process_many2many_dict(rows)
343 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
344 return labels_to_hosts, hosts_to_labels
345
346
347 @classmethod
348 def _get_labels(cls):
349 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000350
351
352 def refresh(self, pending_queue_entries):
353 self._hosts_available = self._get_ready_hosts()
354
355 relevant_jobs = [queue_entry.job_id
356 for queue_entry in pending_queue_entries]
357 self._job_acls = self._get_job_acl_groups(relevant_jobs)
358 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000359 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000360
361 host_ids = self._hosts_available.keys()
362 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000363 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
364
365 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000366
367
368 def _is_acl_accessible(self, host_id, queue_entry):
369 job_acls = self._job_acls.get(queue_entry.job_id, set())
370 host_acls = self._host_acls.get(host_id, set())
371 return len(host_acls.intersection(job_acls)) > 0
372
373
showard989f25d2008-10-01 11:38:11 +0000374 def _check_job_dependencies(self, job_dependencies, host_labels):
375 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000376 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000377
378
379 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
380 queue_entry):
showardade14e22009-01-26 22:38:32 +0000381 if not queue_entry.meta_host:
382 # bypass only_if_needed labels when a specific host is selected
383 return True
384
showard989f25d2008-10-01 11:38:11 +0000385 for label_id in host_labels:
386 label = self._labels[label_id]
387 if not label.only_if_needed:
388 # we don't care about non-only_if_needed labels
389 continue
390 if queue_entry.meta_host == label_id:
391 # if the label was requested in a metahost it's OK
392 continue
393 if label_id not in job_dependencies:
394 return False
395 return True
396
397
showard89f84db2009-03-12 20:39:13 +0000398 def _check_atomic_group_labels(self, host_labels, queue_entry):
399 """
400 Determine if the given HostQueueEntry's atomic group settings are okay
401 to schedule on a host with the given labels.
402
403 @param host_labels - A list of label ids that the host has.
404 @param queue_entry - The HostQueueEntry being considered for the host.
405
406 @returns True if atomic group settings are okay, False otherwise.
407 """
408 return (self._get_host_atomic_group_id(host_labels) ==
409 queue_entry.atomic_group_id)
410
411
412 def _get_host_atomic_group_id(self, host_labels):
413 """
414 Return the atomic group label id for a host with the given set of
415 labels if any, or None otherwise. Raises an exception if more than
416 one atomic group are found in the set of labels.
417
418 @param host_labels - A list of label ids that the host has.
419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
422 @raises SchedulerError - If more than one atomic group label is found.
423 """
424 atomic_ids = [self._labels[label_id].atomic_group_id
425 for label_id in host_labels
426 if self._labels[label_id].atomic_group_id is not None]
427 if not atomic_ids:
428 return None
429 if len(atomic_ids) > 1:
430 raise SchedulerError('More than one atomic label on host.')
431 return atomic_ids[0]
432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
447 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
463 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
464 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000465
showard89f84db2009-03-12 20:39:13 +0000466 return (self._is_acl_accessible(host_id, queue_entry) and
467 self._check_job_dependencies(job_dependencies, host_labels) and
468 self._check_only_if_needed_labels(
469 job_dependencies, host_labels, queue_entry) and
470 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000471
472
showard63a34772008-08-18 19:32:50 +0000473 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000474 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000475 return None
476 return self._hosts_available.pop(queue_entry.host_id, None)
477
478
479 def _is_host_usable(self, host_id):
480 if host_id not in self._hosts_available:
481 # host was already used during this scheduling cycle
482 return False
483 if self._hosts_available[host_id].invalid:
484 # Invalid hosts cannot be used for metahosts. They're included in
485 # the original query because they can be used by non-metahosts.
486 return False
487 return True
488
489
490 def _schedule_metahost(self, queue_entry):
491 label_id = queue_entry.meta_host
492 hosts_in_label = self._label_hosts.get(label_id, set())
493 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
494 set())
495
496 # must iterate over a copy so we can mutate the original while iterating
497 for host_id in list(hosts_in_label):
498 if not self._is_host_usable(host_id):
499 hosts_in_label.remove(host_id)
500 continue
501 if host_id in ineligible_host_ids:
502 continue
showard989f25d2008-10-01 11:38:11 +0000503 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000504 continue
505
showard89f84db2009-03-12 20:39:13 +0000506 # Remove the host from our cached internal state before returning
507 # the host object.
showard63a34772008-08-18 19:32:50 +0000508 hosts_in_label.remove(host_id)
509 return self._hosts_available.pop(host_id)
510 return None
511
512
513 def find_eligible_host(self, queue_entry):
514 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000515 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000516 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000517 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000518 return self._schedule_metahost(queue_entry)
519
520
showard89f84db2009-03-12 20:39:13 +0000521 def find_eligible_atomic_group(self, queue_entry):
522 """
523 Given an atomic group host queue entry, locate an appropriate group
524 of hosts for the associated job to run on.
525
526 The caller is responsible for creating new HQEs for the additional
527 hosts returned in order to run the actual job on them.
528
529 @returns A list of Host instances in a ready state to satisfy this
530 atomic group scheduling. Hosts will all belong to the same
531 atomic group label as specified by the queue_entry.
532 An empty list will be returned if no suitable atomic
533 group could be found.
534
535 TODO(gps): what is responsible for kicking off any attempted repairs on
536 a group of hosts? not this function, but something needs to. We do
537 not communicate that reason for returning [] outside of here...
538 For now, we'll just be unschedulable if enough hosts within one group
539 enter Repair Failed state.
540 """
541 assert queue_entry.atomic_group_id is not None
542 job = queue_entry.job
543 assert job.synch_count and job.synch_count > 0
544 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
545 if job.synch_count > atomic_group.max_number_of_machines:
546 # Such a Job and HostQueueEntry should never be possible to
547 # create using the frontend. Regardless, we can't process it.
548 # Abort it immediately and log an error on the scheduler.
549 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000550 logging.error(
551 'Error: job %d synch_count=%d > requested atomic_group %d '
552 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
553 job.id, job.synch_count, atomic_group.id,
554 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000555 return []
556 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
557 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
558 set())
559
560 # Look in each label associated with atomic_group until we find one with
561 # enough hosts to satisfy the job.
562 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
563 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
564 if queue_entry.meta_host is not None:
565 # If we have a metahost label, only allow its hosts.
566 group_hosts.intersection_update(hosts_in_label)
567 group_hosts -= ineligible_host_ids
568 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
569 group_hosts, queue_entry)
570
571 # Job.synch_count is treated as "minimum synch count" when
572 # scheduling for an atomic group of hosts. The atomic group
573 # number of machines is the maximum to pick out of a single
574 # atomic group label for scheduling at one time.
575 min_hosts = job.synch_count
576 max_hosts = atomic_group.max_number_of_machines
577
578 if len(eligible_hosts_in_group) < min_hosts:
579 # Not enough eligible hosts in this atomic group label.
580 continue
581
showardef519212009-05-08 02:29:53 +0000582 # So that they show up in a sane order when viewing the job.
583 eligible_hosts_in_group = sorted(eligible_hosts_in_group)
584
showard89f84db2009-03-12 20:39:13 +0000585 # Limit ourselves to scheduling the atomic group size.
586 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000587 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000588
589 # Remove the selected hosts from our cached internal state
590 # of available hosts in order to return the Host objects.
591 host_list = []
592 for host_id in eligible_hosts_in_group:
593 hosts_in_label.discard(host_id)
594 host_list.append(self._hosts_available.pop(host_id))
595 return host_list
596
597 return []
598
599
showard170873e2009-01-07 00:22:26 +0000600class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000601 def __init__(self):
602 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000603 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000604 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000605 user_cleanup_time = scheduler_config.config.clean_interval
606 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
607 _db, user_cleanup_time)
608 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000609 self._host_agents = {}
610 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000611
mbligh36768f02008-02-22 18:28:33 +0000612
showard915958d2009-04-22 21:00:58 +0000613 def initialize(self, recover_hosts=True):
614 self._periodic_cleanup.initialize()
615 self._24hr_upkeep.initialize()
616
jadmanski0afbb632008-06-06 21:10:57 +0000617 # always recover processes
618 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000619
jadmanski0afbb632008-06-06 21:10:57 +0000620 if recover_hosts:
621 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000622
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 def tick(self):
showard170873e2009-01-07 00:22:26 +0000625 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000626 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000627 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000628 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000629 self._schedule_new_jobs()
630 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000631 _drone_manager.execute_actions()
632 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000633
showard97aed502008-11-04 02:01:24 +0000634
mblighf3294cc2009-04-08 21:17:38 +0000635 def _run_cleanup(self):
636 self._periodic_cleanup.run_cleanup_maybe()
637 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000638
mbligh36768f02008-02-22 18:28:33 +0000639
showard170873e2009-01-07 00:22:26 +0000640 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
641 for object_id in object_ids:
642 agent_dict.setdefault(object_id, set()).add(agent)
643
644
645 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
646 for object_id in object_ids:
647 assert object_id in agent_dict
648 agent_dict[object_id].remove(agent)
649
650
jadmanski0afbb632008-06-06 21:10:57 +0000651 def add_agent(self, agent):
652 self._agents.append(agent)
653 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000654 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
655 self._register_agent_for_ids(self._queue_entry_agents,
656 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000657
showard170873e2009-01-07 00:22:26 +0000658
659 def get_agents_for_entry(self, queue_entry):
660 """
661 Find agents corresponding to the specified queue_entry.
662 """
showardd3dc1992009-04-22 21:01:40 +0000663 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000664
665
666 def host_has_agent(self, host):
667 """
668 Determine if there is currently an Agent present using this host.
669 """
670 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000671
672
jadmanski0afbb632008-06-06 21:10:57 +0000673 def remove_agent(self, agent):
674 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000675 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
676 agent)
677 self._unregister_agent_for_ids(self._queue_entry_agents,
678 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000682 self._register_pidfiles()
683 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000684 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000685 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000686 self._reverify_remaining_hosts()
687 # reinitialize drones after killing orphaned processes, since they can
688 # leave around files when they die
689 _drone_manager.execute_actions()
690 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000691
showard170873e2009-01-07 00:22:26 +0000692
693 def _register_pidfiles(self):
694 # during recovery we may need to read pidfiles for both running and
695 # parsing entries
696 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000697 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000698 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000699 for pidfile_name in _ALL_PIDFILE_NAMES:
700 pidfile_id = _drone_manager.get_pidfile_id_from(
701 queue_entry.execution_tag(), pidfile_name=pidfile_name)
702 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000703
704
showardd3dc1992009-04-22 21:01:40 +0000705 def _recover_entries_with_status(self, status, orphans, pidfile_name,
706 recover_entries_fn):
707 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000708 for queue_entry in queue_entries:
709 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000710 # synchronous job we've already recovered
711 continue
showardd3dc1992009-04-22 21:01:40 +0000712 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000713 execution_tag = queue_entry.execution_tag()
714 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000715 run_monitor.attach_to_existing_process(execution_tag,
716 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000717
718 log_message = ('Recovering %s entry %s ' %
719 (status.lower(),
720 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000721 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000722 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000723 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000724 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000725 continue
mbligh90a549d2008-03-25 23:52:34 +0000726
showard597bfd32009-05-08 18:22:50 +0000727 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000728 run_monitor.get_process())
729 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
730 orphans.discard(run_monitor.get_process())
731
732
733 def _kill_remaining_orphan_processes(self, orphans):
734 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000735 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000736 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000737
showard170873e2009-01-07 00:22:26 +0000738
showardd3dc1992009-04-22 21:01:40 +0000739 def _recover_running_entries(self, orphans):
740 def recover_entries(job, queue_entries, run_monitor):
741 if run_monitor is not None:
742 queue_task = RecoveryQueueTask(job=job,
743 queue_entries=queue_entries,
744 run_monitor=run_monitor)
745 self.add_agent(Agent(tasks=[queue_task],
746 num_processes=len(queue_entries)))
747 # else, _requeue_other_active_entries will cover this
748
749 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
750 orphans, '.autoserv_execute',
751 recover_entries)
752
753
754 def _recover_gathering_entries(self, orphans):
755 def recover_entries(job, queue_entries, run_monitor):
756 gather_task = GatherLogsTask(job, queue_entries,
757 run_monitor=run_monitor)
758 self.add_agent(Agent([gather_task]))
759
760 self._recover_entries_with_status(
761 models.HostQueueEntry.Status.GATHERING,
762 orphans, _CRASHINFO_PID_FILE, recover_entries)
763
764
765 def _recover_parsing_entries(self, orphans):
766 def recover_entries(job, queue_entries, run_monitor):
767 reparse_task = FinalReparseTask(queue_entries,
768 run_monitor=run_monitor)
769 self.add_agent(Agent([reparse_task], num_processes=0))
770
771 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
772 orphans, _PARSER_PID_FILE,
773 recover_entries)
774
775
776 def _recover_all_recoverable_entries(self):
777 orphans = _drone_manager.get_orphaned_autoserv_processes()
778 self._recover_running_entries(orphans)
779 self._recover_gathering_entries(orphans)
780 self._recover_parsing_entries(orphans)
781 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard97aed502008-11-04 02:01:24 +0000783
showard170873e2009-01-07 00:22:26 +0000784 def _requeue_other_active_entries(self):
785 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000786 where='active AND NOT complete AND '
787 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000788 for queue_entry in queue_entries:
789 if self.get_agents_for_entry(queue_entry):
790 # entry has already been recovered
791 continue
showardd3dc1992009-04-22 21:01:40 +0000792 if queue_entry.aborted:
793 queue_entry.abort(self)
794 continue
795
796 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000797 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000798 if queue_entry.host:
799 tasks = queue_entry.host.reverify_tasks()
800 self.add_agent(Agent(tasks))
801 agent = queue_entry.requeue()
802
803
804 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000805 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000806 self._reverify_hosts_where("""(status = 'Repairing' OR
807 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000808 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000809
showard170873e2009-01-07 00:22:26 +0000810 # recover "Running" hosts with no active queue entries, although this
811 # should never happen
812 message = ('Recovering running host %s - this probably indicates a '
813 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000814 self._reverify_hosts_where("""status = 'Running' AND
815 id NOT IN (SELECT host_id
816 FROM host_queue_entries
817 WHERE active)""",
818 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000819
820
jadmanski0afbb632008-06-06 21:10:57 +0000821 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000822 print_message='Reverifying host %s'):
823 full_where='locked = 0 AND invalid = 0 AND ' + where
824 for host in Host.fetch(where=full_where):
825 if self.host_has_agent(host):
826 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000827 continue
showard170873e2009-01-07 00:22:26 +0000828 if print_message:
showardb18134f2009-03-20 20:52:18 +0000829 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000830 tasks = host.reverify_tasks()
831 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000832
833
jadmanski0afbb632008-06-06 21:10:57 +0000834 def _recover_hosts(self):
835 # recover "Repair Failed" hosts
836 message = 'Reverifying dead host %s'
837 self._reverify_hosts_where("status = 'Repair Failed'",
838 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000839
840
showard04c82c52008-05-29 19:38:12 +0000841
showardb95b1bd2008-08-15 18:11:04 +0000842 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000843 # prioritize by job priority, then non-metahost over metahost, then FIFO
844 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000845 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000846 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000847 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000848
849
showard89f84db2009-03-12 20:39:13 +0000850 def _refresh_pending_queue_entries(self):
851 """
852 Lookup the pending HostQueueEntries and call our HostScheduler
853 refresh() method given that list. Return the list.
854
855 @returns A list of pending HostQueueEntries sorted in priority order.
856 """
showard63a34772008-08-18 19:32:50 +0000857 queue_entries = self._get_pending_queue_entries()
858 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000859 return []
showardb95b1bd2008-08-15 18:11:04 +0000860
showard63a34772008-08-18 19:32:50 +0000861 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000862
showard89f84db2009-03-12 20:39:13 +0000863 return queue_entries
864
865
866 def _schedule_atomic_group(self, queue_entry):
867 """
868 Schedule the given queue_entry on an atomic group of hosts.
869
870 Returns immediately if there are insufficient available hosts.
871
872 Creates new HostQueueEntries based off of queue_entry for the
873 scheduled hosts and starts them all running.
874 """
875 # This is a virtual host queue entry representing an entire
876 # atomic group, find a group and schedule their hosts.
877 group_hosts = self._host_scheduler.find_eligible_atomic_group(
878 queue_entry)
879 if not group_hosts:
880 return
881 # The first assigned host uses the original HostQueueEntry
882 group_queue_entries = [queue_entry]
883 for assigned_host in group_hosts[1:]:
884 # Create a new HQE for every additional assigned_host.
885 new_hqe = HostQueueEntry.clone(queue_entry)
886 new_hqe.save()
887 group_queue_entries.append(new_hqe)
888 assert len(group_queue_entries) == len(group_hosts)
889 for queue_entry, host in itertools.izip(group_queue_entries,
890 group_hosts):
891 self._run_queue_entry(queue_entry, host)
892
893
894 def _schedule_new_jobs(self):
895 queue_entries = self._refresh_pending_queue_entries()
896 if not queue_entries:
897 return
898
showard63a34772008-08-18 19:32:50 +0000899 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000900 if (queue_entry.atomic_group_id is None or
901 queue_entry.host_id is not None):
902 assigned_host = self._host_scheduler.find_eligible_host(
903 queue_entry)
904 if assigned_host:
905 self._run_queue_entry(queue_entry, assigned_host)
906 else:
907 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000908
909
910 def _run_queue_entry(self, queue_entry, host):
911 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000912 # in some cases (synchronous jobs with run_verify=False), agent may be
913 # None
showard9976ce92008-10-15 20:28:13 +0000914 if agent:
915 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000916
917
jadmanski0afbb632008-06-06 21:10:57 +0000918 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000919 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
920 for agent in self.get_agents_for_entry(entry):
921 agent.abort()
922 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000923
924
showard324bf812009-01-20 23:23:38 +0000925 def _can_start_agent(self, agent, num_started_this_cycle,
926 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000927 # always allow zero-process agents to run
928 if agent.num_processes == 0:
929 return True
930 # don't allow any nonzero-process agents to run after we've reached a
931 # limit (this avoids starvation of many-process agents)
932 if have_reached_limit:
933 return False
934 # total process throttling
showard324bf812009-01-20 23:23:38 +0000935 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000936 return False
937 # if a single agent exceeds the per-cycle throttling, still allow it to
938 # run when it's the first agent in the cycle
939 if num_started_this_cycle == 0:
940 return True
941 # per-cycle throttling
942 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000943 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000944 return False
945 return True
946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000949 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000950 have_reached_limit = False
951 # iterate over copy, so we can remove agents during iteration
952 for agent in list(self._agents):
953 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000954 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000955 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000956 continue
957 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000958 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000959 have_reached_limit):
960 have_reached_limit = True
961 continue
showard4c5374f2008-09-04 17:02:56 +0000962 num_started_this_cycle += agent.num_processes
963 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000964 logging.info('%d running processes',
965 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000966
967
showard29f7cd22009-04-29 21:16:24 +0000968 def _process_recurring_runs(self):
969 recurring_runs = models.RecurringRun.objects.filter(
970 start_date__lte=datetime.datetime.now())
971 for rrun in recurring_runs:
972 # Create job from template
973 job = rrun.job
974 info = rpc_utils.get_job_info(job)
975
976 host_objects = info['hosts']
977 one_time_hosts = info['one_time_hosts']
978 metahost_objects = info['meta_hosts']
979 dependencies = info['dependencies']
980 atomic_group = info['atomic_group']
981
982 for host in one_time_hosts or []:
983 this_host = models.Host.create_one_time_host(host.hostname)
984 host_objects.append(this_host)
985
986 try:
987 rpc_utils.create_new_job(owner=rrun.owner.login,
988 host_objects=host_objects,
989 metahost_objects=metahost_objects,
990 name=job.name,
991 priority=job.priority,
992 control_file=job.control_file,
993 control_type=job.control_type,
994 is_template=False,
995 synch_count=job.synch_count,
996 timeout=job.timeout,
997 run_verify=job.run_verify,
998 email_list=job.email_list,
999 dependencies=dependencies,
1000 reboot_before=job.reboot_before,
1001 reboot_after=job.reboot_after,
1002 atomic_group=atomic_group)
1003
1004 except Exception, ex:
1005 logging.exception(ex)
1006 #TODO send email
1007
1008 if rrun.loop_count == 1:
1009 rrun.delete()
1010 else:
1011 if rrun.loop_count != 0: # if not infinite loop
1012 # calculate new start_date
1013 difference = datetime.timedelta(seconds=rrun.loop_period)
1014 rrun.start_date = rrun.start_date + difference
1015 rrun.loop_count -= 1
1016 rrun.save()
1017
1018
showard170873e2009-01-07 00:22:26 +00001019class PidfileRunMonitor(object):
1020 """
1021 Client must call either run() to start a new process or
1022 attach_to_existing_process().
1023 """
mbligh36768f02008-02-22 18:28:33 +00001024
showard170873e2009-01-07 00:22:26 +00001025 class _PidfileException(Exception):
1026 """
1027 Raised when there's some unexpected behavior with the pid file, but only
1028 used internally (never allowed to escape this class).
1029 """
mbligh36768f02008-02-22 18:28:33 +00001030
1031
showard170873e2009-01-07 00:22:26 +00001032 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001033 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001034 self._start_time = None
1035 self.pidfile_id = None
1036 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001037
1038
showard170873e2009-01-07 00:22:26 +00001039 def _add_nice_command(self, command, nice_level):
1040 if not nice_level:
1041 return command
1042 return ['nice', '-n', str(nice_level)] + command
1043
1044
1045 def _set_start_time(self):
1046 self._start_time = time.time()
1047
1048
1049 def run(self, command, working_directory, nice_level=None, log_file=None,
1050 pidfile_name=None, paired_with_pidfile=None):
1051 assert command is not None
1052 if nice_level is not None:
1053 command = ['nice', '-n', str(nice_level)] + command
1054 self._set_start_time()
1055 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001056 command, working_directory, pidfile_name=pidfile_name,
1057 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001058
1059
showardd3dc1992009-04-22 21:01:40 +00001060 def attach_to_existing_process(self, execution_tag,
1061 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001062 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001063 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1064 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001065 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def kill(self):
showard170873e2009-01-07 00:22:26 +00001069 if self.has_process():
1070 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001071
mbligh36768f02008-02-22 18:28:33 +00001072
showard170873e2009-01-07 00:22:26 +00001073 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001074 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001075 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001076
1077
showard170873e2009-01-07 00:22:26 +00001078 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001079 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001080 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001081 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001082
1083
showard170873e2009-01-07 00:22:26 +00001084 def _read_pidfile(self, use_second_read=False):
1085 assert self.pidfile_id is not None, (
1086 'You must call run() or attach_to_existing_process()')
1087 contents = _drone_manager.get_pidfile_contents(
1088 self.pidfile_id, use_second_read=use_second_read)
1089 if contents.is_invalid():
1090 self._state = drone_manager.PidfileContents()
1091 raise self._PidfileException(contents)
1092 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001093
1094
showard21baa452008-10-21 00:08:39 +00001095 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001096 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1097 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001098 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001099 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001100 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001101
1102
1103 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001104 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001105 return
mblighbb421852008-03-11 22:36:16 +00001106
showard21baa452008-10-21 00:08:39 +00001107 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001108
showard170873e2009-01-07 00:22:26 +00001109 if self._state.process is None:
1110 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001111 return
mbligh90a549d2008-03-25 23:52:34 +00001112
showard21baa452008-10-21 00:08:39 +00001113 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001114 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001115 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001116 return
mbligh90a549d2008-03-25 23:52:34 +00001117
showard170873e2009-01-07 00:22:26 +00001118 # pid but no running process - maybe process *just* exited
1119 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001120 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001121 # autoserv exited without writing an exit code
1122 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001123 self._handle_pidfile_error(
1124 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001125
showard21baa452008-10-21 00:08:39 +00001126
1127 def _get_pidfile_info(self):
1128 """\
1129 After completion, self._state will contain:
1130 pid=None, exit_status=None if autoserv has not yet run
1131 pid!=None, exit_status=None if autoserv is running
1132 pid!=None, exit_status!=None if autoserv has completed
1133 """
1134 try:
1135 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001136 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001137 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001141 """\
1142 Called when no pidfile is found or no pid is in the pidfile.
1143 """
showard170873e2009-01-07 00:22:26 +00001144 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001145 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001146 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1147 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001148 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001149 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001150
1151
showard35162b02009-03-03 02:17:30 +00001152 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001153 """\
1154 Called when autoserv has exited without writing an exit status,
1155 or we've timed out waiting for autoserv to write a pid to the
1156 pidfile. In either case, we just return failure and the caller
1157 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001158
showard170873e2009-01-07 00:22:26 +00001159 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001160 """
1161 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001162 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001163 self._state.exit_status = 1
1164 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001168 self._get_pidfile_info()
1169 return self._state.exit_status
1170
1171
1172 def num_tests_failed(self):
1173 self._get_pidfile_info()
1174 assert self._state.num_tests_failed is not None
1175 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001176
1177
mbligh36768f02008-02-22 18:28:33 +00001178class Agent(object):
showard170873e2009-01-07 00:22:26 +00001179 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001181 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001182 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001183 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001184
showard170873e2009-01-07 00:22:26 +00001185 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1186 for task in tasks)
1187 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1188
showardd3dc1992009-04-22 21:01:40 +00001189 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001190 for task in tasks:
1191 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001192
1193
showardd3dc1992009-04-22 21:01:40 +00001194 def _clear_queue(self):
1195 self.queue = Queue.Queue(0)
1196
1197
showard170873e2009-01-07 00:22:26 +00001198 def _union_ids(self, id_lists):
1199 return set(itertools.chain(*id_lists))
1200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def add_task(self, task):
1203 self.queue.put_nowait(task)
1204 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def tick(self):
showard21baa452008-10-21 00:08:39 +00001208 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001209 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001210 self.active_task.poll()
1211 if not self.active_task.is_done():
1212 return
1213 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001214
1215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001217 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001218 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001219 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001220 if not self.active_task.success:
1221 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001222 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001223
jadmanski0afbb632008-06-06 21:10:57 +00001224 if not self.is_done():
1225 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001229 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001230 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1231 # get reset.
1232 new_agent = Agent(self.active_task.failure_tasks)
1233 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001234
mblighe2586682008-02-29 22:45:46 +00001235
showard4c5374f2008-09-04 17:02:56 +00001236 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001237 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001241 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001242
1243
showardd3dc1992009-04-22 21:01:40 +00001244 def abort(self):
showard08a36412009-05-05 01:01:13 +00001245 # abort tasks until the queue is empty or a task ignores the abort
1246 while not self.is_done():
1247 if not self.active_task:
1248 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001249 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001250 if not self.active_task.aborted:
1251 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001252 return
1253 self.active_task = None
1254
showardd3dc1992009-04-22 21:01:40 +00001255
mbligh36768f02008-02-22 18:28:33 +00001256class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001257 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1258 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001259 self.done = False
1260 self.failure_tasks = failure_tasks
1261 self.started = False
1262 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001263 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001264 self.task = None
1265 self.agent = None
1266 self.monitor = None
1267 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001268 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001269 self.queue_entry_ids = []
1270 self.host_ids = []
1271 self.log_file = None
1272
1273
1274 def _set_ids(self, host=None, queue_entries=None):
1275 if queue_entries and queue_entries != [None]:
1276 self.host_ids = [entry.host.id for entry in queue_entries]
1277 self.queue_entry_ids = [entry.id for entry in queue_entries]
1278 else:
1279 assert host
1280 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def poll(self):
showard08a36412009-05-05 01:01:13 +00001284 if not self.started:
1285 self.start()
1286 self.tick()
1287
1288
1289 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001290 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001291 exit_code = self.monitor.exit_code()
1292 if exit_code is None:
1293 return
1294 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001295 else:
1296 success = False
mbligh36768f02008-02-22 18:28:33 +00001297
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001299
1300
jadmanski0afbb632008-06-06 21:10:57 +00001301 def is_done(self):
1302 return self.done
mbligh36768f02008-02-22 18:28:33 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001306 if self.done:
1307 return
jadmanski0afbb632008-06-06 21:10:57 +00001308 self.done = True
1309 self.success = success
1310 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001311
1312
jadmanski0afbb632008-06-06 21:10:57 +00001313 def prolog(self):
1314 pass
mblighd64e5702008-04-04 21:39:28 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001318 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001319
mbligh36768f02008-02-22 18:28:33 +00001320
jadmanski0afbb632008-06-06 21:10:57 +00001321 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001322 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001323 _drone_manager.copy_to_results_repository(
1324 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def epilog(self):
1328 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001329
1330
jadmanski0afbb632008-06-06 21:10:57 +00001331 def start(self):
1332 assert self.agent
1333
1334 if not self.started:
1335 self.prolog()
1336 self.run()
1337
1338 self.started = True
1339
1340
1341 def abort(self):
1342 if self.monitor:
1343 self.monitor.kill()
1344 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001345 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001346 self.cleanup()
1347
1348
showard170873e2009-01-07 00:22:26 +00001349 def set_host_log_file(self, base_name, host):
1350 filename = '%s.%s' % (time.time(), base_name)
1351 self.log_file = os.path.join('hosts', host.hostname, filename)
1352
1353
showardde634ee2009-01-30 01:44:24 +00001354 def _get_consistent_execution_tag(self, queue_entries):
1355 first_execution_tag = queue_entries[0].execution_tag()
1356 for queue_entry in queue_entries[1:]:
1357 assert queue_entry.execution_tag() == first_execution_tag, (
1358 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1359 queue_entry,
1360 first_execution_tag,
1361 queue_entries[0]))
1362 return first_execution_tag
1363
1364
showarda1e74b32009-05-12 17:32:04 +00001365 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001366 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001367 if use_monitor is None:
1368 assert self.monitor
1369 use_monitor = self.monitor
1370 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001371 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001372 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001373 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001374 results_path)
showardde634ee2009-01-30 01:44:24 +00001375
showarda1e74b32009-05-12 17:32:04 +00001376
1377 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001378 reparse_task = FinalReparseTask(queue_entries)
1379 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1380
1381
showarda1e74b32009-05-12 17:32:04 +00001382 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1383 self._copy_results(queue_entries, use_monitor)
1384 self._parse_results(queue_entries)
1385
1386
showardd3dc1992009-04-22 21:01:40 +00001387 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001388 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001389 self.monitor = PidfileRunMonitor()
1390 self.monitor.run(self.cmd, self._working_directory,
1391 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001392 log_file=self.log_file,
1393 pidfile_name=pidfile_name,
1394 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001395
1396
showardd9205182009-04-27 20:09:55 +00001397class TaskWithJobKeyvals(object):
1398 """AgentTask mixin providing functionality to help with job keyval files."""
1399 _KEYVAL_FILE = 'keyval'
1400 def _format_keyval(self, key, value):
1401 return '%s=%s' % (key, value)
1402
1403
1404 def _keyval_path(self):
1405 """Subclasses must override this"""
1406 raise NotImplemented
1407
1408
1409 def _write_keyval_after_job(self, field, value):
1410 assert self.monitor
1411 if not self.monitor.has_process():
1412 return
1413 _drone_manager.write_lines_to_file(
1414 self._keyval_path(), [self._format_keyval(field, value)],
1415 paired_with_process=self.monitor.get_process())
1416
1417
1418 def _job_queued_keyval(self, job):
1419 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1420
1421
1422 def _write_job_finished(self):
1423 self._write_keyval_after_job("job_finished", int(time.time()))
1424
1425
1426class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001427 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001428 """\
showard170873e2009-01-07 00:22:26 +00001429 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001430 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001431 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001432 # normalize the protection name
1433 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001434
jadmanski0afbb632008-06-06 21:10:57 +00001435 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001436 self.queue_entry_to_fail = queue_entry
1437 # *don't* include the queue entry in IDs -- if the queue entry is
1438 # aborted, we want to leave the repair task running
1439 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001440
1441 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001442 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1443 ['-R', '--host-protection', protection],
1444 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001445 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1446
showard170873e2009-01-07 00:22:26 +00001447 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001448
mbligh36768f02008-02-22 18:28:33 +00001449
jadmanski0afbb632008-06-06 21:10:57 +00001450 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001451 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001452 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001453 if self.queue_entry_to_fail:
1454 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001455
1456
showardd9205182009-04-27 20:09:55 +00001457 def _keyval_path(self):
1458 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1459
1460
showardde634ee2009-01-30 01:44:24 +00001461 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001462 assert self.queue_entry_to_fail
1463
1464 if self.queue_entry_to_fail.meta_host:
1465 return # don't fail metahost entries, they'll be reassigned
1466
1467 self.queue_entry_to_fail.update_from_database()
1468 if self.queue_entry_to_fail.status != 'Queued':
1469 return # entry has been aborted
1470
1471 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001472 queued_key, queued_time = self._job_queued_keyval(
1473 self.queue_entry_to_fail.job)
1474 self._write_keyval_after_job(queued_key, queued_time)
1475 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001476 # copy results logs into the normal place for job results
1477 _drone_manager.copy_results_on_drone(
1478 self.monitor.get_process(),
1479 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001480 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001481
showarda1e74b32009-05-12 17:32:04 +00001482 self._copy_results([self.queue_entry_to_fail])
1483 if self.queue_entry_to_fail.job.parse_failed_repair:
1484 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001485 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001486
1487
jadmanski0afbb632008-06-06 21:10:57 +00001488 def epilog(self):
1489 super(RepairTask, self).epilog()
1490 if self.success:
1491 self.host.set_status('Ready')
1492 else:
1493 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001494 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001495 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001496
1497
showard8fe93b52008-11-18 17:53:22 +00001498class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001499 def epilog(self):
1500 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001501 should_copy_results = (self.queue_entry and not self.success
1502 and not self.queue_entry.meta_host)
1503 if should_copy_results:
1504 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001505 destination = os.path.join(self.queue_entry.execution_tag(),
1506 os.path.basename(self.log_file))
1507 _drone_manager.copy_to_results_repository(
1508 self.monitor.get_process(), self.log_file,
1509 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001510
1511
1512class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001513 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001514 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001515 self.host = host or queue_entry.host
1516 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001517
jadmanski0afbb632008-06-06 21:10:57 +00001518 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001519 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1520 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001521 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001522 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1523 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001524
showard170873e2009-01-07 00:22:26 +00001525 self.set_host_log_file('verify', self.host)
1526 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001530 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001531 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001532 if self.queue_entry:
1533 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001534 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def epilog(self):
1538 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001539
jadmanski0afbb632008-06-06 21:10:57 +00001540 if self.success:
1541 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001542
1543
showardd9205182009-04-27 20:09:55 +00001544class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001545 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001546 self.job = job
1547 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001548 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001549 super(QueueTask, self).__init__(cmd, self._execution_tag())
1550 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001551
1552
showard73ec0442009-02-07 02:05:20 +00001553 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001554 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001555
1556
1557 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1558 keyval_contents = '\n'.join(self._format_keyval(key, value)
1559 for key, value in keyval_dict.iteritems())
1560 # always end with a newline to allow additional keyvals to be written
1561 keyval_contents += '\n'
1562 _drone_manager.attach_file_to_execution(self._execution_tag(),
1563 keyval_contents,
1564 file_path=keyval_path)
1565
1566
1567 def _write_keyvals_before_job(self, keyval_dict):
1568 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1569
1570
showard170873e2009-01-07 00:22:26 +00001571 def _write_host_keyvals(self, host):
1572 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1573 host.hostname)
1574 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001575 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1576 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001577
1578
showard170873e2009-01-07 00:22:26 +00001579 def _execution_tag(self):
1580 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001581
1582
jadmanski0afbb632008-06-06 21:10:57 +00001583 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001584 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001585 keyval_dict = {queued_key: queued_time}
1586 if self.group_name:
1587 keyval_dict['host_group_name'] = self.group_name
1588 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001589 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001590 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001591 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001592 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001593 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001594 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001595 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001596 assert len(self.queue_entries) == 1
1597 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001598
1599
showard35162b02009-03-03 02:17:30 +00001600 def _write_lost_process_error_file(self):
1601 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1602 _drone_manager.write_lines_to_file(error_file_path,
1603 [_LOST_PROCESS_ERROR])
1604
1605
showardd3dc1992009-04-22 21:01:40 +00001606 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001607 if not self.monitor:
1608 return
1609
showardd9205182009-04-27 20:09:55 +00001610 self._write_job_finished()
1611
showardd3dc1992009-04-22 21:01:40 +00001612 # both of these conditionals can be true, iff the process ran, wrote a
1613 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001614 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001615 gather_task = GatherLogsTask(self.job, self.queue_entries)
1616 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001617
1618 if self.monitor.lost_process:
1619 self._write_lost_process_error_file()
1620 for queue_entry in self.queue_entries:
1621 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001622
1623
showardcbd74612008-11-19 21:42:02 +00001624 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001625 _drone_manager.write_lines_to_file(
1626 os.path.join(self._execution_tag(), 'status.log'),
1627 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001628 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001629
1630
jadmanskif7fa2cc2008-10-01 14:13:23 +00001631 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001632 if not self.monitor or not self.monitor.has_process():
1633 return
1634
jadmanskif7fa2cc2008-10-01 14:13:23 +00001635 # build up sets of all the aborted_by and aborted_on values
1636 aborted_by, aborted_on = set(), set()
1637 for queue_entry in self.queue_entries:
1638 if queue_entry.aborted_by:
1639 aborted_by.add(queue_entry.aborted_by)
1640 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1641 aborted_on.add(t)
1642
1643 # extract some actual, unique aborted by value and write it out
1644 assert len(aborted_by) <= 1
1645 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001646 aborted_by_value = aborted_by.pop()
1647 aborted_on_value = max(aborted_on)
1648 else:
1649 aborted_by_value = 'autotest_system'
1650 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001651
showarda0382352009-02-11 23:36:43 +00001652 self._write_keyval_after_job("aborted_by", aborted_by_value)
1653 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001654
showardcbd74612008-11-19 21:42:02 +00001655 aborted_on_string = str(datetime.datetime.fromtimestamp(
1656 aborted_on_value))
1657 self._write_status_comment('Job aborted by %s on %s' %
1658 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001659
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 def abort(self):
1662 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001663 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001664 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001665
1666
jadmanski0afbb632008-06-06 21:10:57 +00001667 def epilog(self):
1668 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001669 self._finish_task()
1670 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001671
1672
mblighbb421852008-03-11 22:36:16 +00001673class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001674 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001675 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001676 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001677
1678
jadmanski0afbb632008-06-06 21:10:57 +00001679 def run(self):
1680 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001681
1682
jadmanski0afbb632008-06-06 21:10:57 +00001683 def prolog(self):
1684 # recovering an existing process - don't do prolog
1685 pass
mblighbb421852008-03-11 22:36:16 +00001686
1687
showardd3dc1992009-04-22 21:01:40 +00001688class PostJobTask(AgentTask):
1689 def __init__(self, queue_entries, pidfile_name, logfile_name,
1690 run_monitor=None):
1691 """
1692 If run_monitor != None, we're recovering a running task.
1693 """
1694 self._queue_entries = queue_entries
1695 self._pidfile_name = pidfile_name
1696 self._run_monitor = run_monitor
1697
1698 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1699 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1700 self._autoserv_monitor = PidfileRunMonitor()
1701 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1702 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1703
1704 if _testing_mode:
1705 command = 'true'
1706 else:
1707 command = self._generate_command(self._results_dir)
1708
1709 super(PostJobTask, self).__init__(cmd=command,
1710 working_directory=self._execution_tag)
1711
1712 self.log_file = os.path.join(self._execution_tag, logfile_name)
1713 self._final_status = self._determine_final_status()
1714
1715
1716 def _generate_command(self, results_dir):
1717 raise NotImplementedError('Subclasses must override this')
1718
1719
1720 def _job_was_aborted(self):
1721 was_aborted = None
1722 for queue_entry in self._queue_entries:
1723 queue_entry.update_from_database()
1724 if was_aborted is None: # first queue entry
1725 was_aborted = bool(queue_entry.aborted)
1726 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1727 email_manager.manager.enqueue_notify_email(
1728 'Inconsistent abort state',
1729 'Queue entries have inconsistent abort state: ' +
1730 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1731 # don't crash here, just assume true
1732 return True
1733 return was_aborted
1734
1735
1736 def _determine_final_status(self):
1737 if self._job_was_aborted():
1738 return models.HostQueueEntry.Status.ABORTED
1739
1740 # we'll use a PidfileRunMonitor to read the autoserv exit status
1741 if self._autoserv_monitor.exit_code() == 0:
1742 return models.HostQueueEntry.Status.COMPLETED
1743 return models.HostQueueEntry.Status.FAILED
1744
1745
1746 def run(self):
1747 if self._run_monitor is not None:
1748 self.monitor = self._run_monitor
1749 else:
1750 # make sure we actually have results to work with.
1751 # this should never happen in normal operation.
1752 if not self._autoserv_monitor.has_process():
1753 email_manager.manager.enqueue_notify_email(
1754 'No results in post-job task',
1755 'No results in post-job task at %s' %
1756 self._autoserv_monitor.pidfile_id)
1757 self.finished(False)
1758 return
1759
1760 super(PostJobTask, self).run(
1761 pidfile_name=self._pidfile_name,
1762 paired_with_pidfile=self._paired_with_pidfile)
1763
1764
1765 def _set_all_statuses(self, status):
1766 for queue_entry in self._queue_entries:
1767 queue_entry.set_status(status)
1768
1769
1770 def abort(self):
1771 # override AgentTask.abort() to avoid killing the process and ending
1772 # the task. post-job tasks continue when the job is aborted.
1773 pass
1774
1775
1776class GatherLogsTask(PostJobTask):
1777 """
1778 Task responsible for
1779 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1780 * copying logs to the results repository
1781 * spawning CleanupTasks for hosts, if necessary
1782 * spawning a FinalReparseTask for the job
1783 """
1784 def __init__(self, job, queue_entries, run_monitor=None):
1785 self._job = job
1786 super(GatherLogsTask, self).__init__(
1787 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1788 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1789 self._set_ids(queue_entries=queue_entries)
1790
1791
1792 def _generate_command(self, results_dir):
1793 host_list = ','.join(queue_entry.host.hostname
1794 for queue_entry in self._queue_entries)
1795 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1796 '-r', results_dir]
1797
1798
1799 def prolog(self):
1800 super(GatherLogsTask, self).prolog()
1801 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1802
1803
1804 def _reboot_hosts(self):
1805 reboot_after = self._job.reboot_after
1806 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001807 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1808 do_reboot = True
1809 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001810 do_reboot = True
1811 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1812 final_success = (
1813 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1814 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1815 do_reboot = (final_success and num_tests_failed == 0)
1816
1817 for queue_entry in self._queue_entries:
1818 if do_reboot:
1819 # don't pass the queue entry to the CleanupTask. if the cleanup
1820 # fails, the job doesn't care -- it's over.
1821 cleanup_task = CleanupTask(host=queue_entry.host)
1822 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1823 else:
1824 queue_entry.host.set_status('Ready')
1825
1826
1827 def epilog(self):
1828 super(GatherLogsTask, self).epilog()
showard6b733412009-04-27 20:09:18 +00001829 self._copy_and_parse_results(self._queue_entries,
1830 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001831 self._reboot_hosts()
1832
1833
showard0bbfc212009-04-29 21:06:13 +00001834 def run(self):
showard597bfd32009-05-08 18:22:50 +00001835 autoserv_exit_code = self._autoserv_monitor.exit_code()
1836 # only run if Autoserv exited due to some signal. if we have no exit
1837 # code, assume something bad (and signal-like) happened.
1838 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001839 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001840 else:
1841 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001842
1843
showard8fe93b52008-11-18 17:53:22 +00001844class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001845 def __init__(self, host=None, queue_entry=None):
1846 assert bool(host) ^ bool(queue_entry)
1847 if queue_entry:
1848 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001849 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001850 self.host = host
showard170873e2009-01-07 00:22:26 +00001851
1852 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001853 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1854 ['--cleanup'],
1855 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001856 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001857 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1858 failure_tasks=[repair_task])
1859
1860 self._set_ids(host=host, queue_entries=[queue_entry])
1861 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001862
mblighd5c95802008-03-05 00:33:46 +00001863
jadmanski0afbb632008-06-06 21:10:57 +00001864 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001865 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001866 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001867 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001868
mblighd5c95802008-03-05 00:33:46 +00001869
showard21baa452008-10-21 00:08:39 +00001870 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001871 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001872 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001873 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001874 self.host.update_field('dirty', 0)
1875
1876
showardd3dc1992009-04-22 21:01:40 +00001877class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001878 _num_running_parses = 0
1879
showardd3dc1992009-04-22 21:01:40 +00001880 def __init__(self, queue_entries, run_monitor=None):
1881 super(FinalReparseTask, self).__init__(queue_entries,
1882 pidfile_name=_PARSER_PID_FILE,
1883 logfile_name='.parse.log',
1884 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001885 # don't use _set_ids, since we don't want to set the host_ids
1886 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001887 self._parse_started = False
1888
showard97aed502008-11-04 02:01:24 +00001889
1890 @classmethod
1891 def _increment_running_parses(cls):
1892 cls._num_running_parses += 1
1893
1894
1895 @classmethod
1896 def _decrement_running_parses(cls):
1897 cls._num_running_parses -= 1
1898
1899
1900 @classmethod
1901 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001902 return (cls._num_running_parses <
1903 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001904
1905
1906 def prolog(self):
1907 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001908 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001909
1910
1911 def epilog(self):
1912 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001913 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001914
1915
showardd3dc1992009-04-22 21:01:40 +00001916 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001917 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001918 results_dir]
showard97aed502008-11-04 02:01:24 +00001919
1920
showard08a36412009-05-05 01:01:13 +00001921 def tick(self):
1922 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001923 # and we can, at which point we revert to default behavior
1924 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001925 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001926 else:
1927 self._try_starting_parse()
1928
1929
1930 def run(self):
1931 # override run() to not actually run unless we can
1932 self._try_starting_parse()
1933
1934
1935 def _try_starting_parse(self):
1936 if not self._can_run_new_parse():
1937 return
showard170873e2009-01-07 00:22:26 +00001938
showard97aed502008-11-04 02:01:24 +00001939 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001940 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001941
showard97aed502008-11-04 02:01:24 +00001942 self._increment_running_parses()
1943 self._parse_started = True
1944
1945
1946 def finished(self, success):
1947 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001948 if self._parse_started:
1949 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001950
1951
showardc9ae1782009-01-30 01:42:37 +00001952class SetEntryPendingTask(AgentTask):
1953 def __init__(self, queue_entry):
1954 super(SetEntryPendingTask, self).__init__(cmd='')
1955 self._queue_entry = queue_entry
1956 self._set_ids(queue_entries=[queue_entry])
1957
1958
1959 def run(self):
1960 agent = self._queue_entry.on_pending()
1961 if agent:
1962 self.agent.dispatcher.add_agent(agent)
1963 self.finished(True)
1964
1965
showarda3c58572009-03-12 20:36:59 +00001966class DBError(Exception):
1967 """Raised by the DBObject constructor when its select fails."""
1968
1969
mbligh36768f02008-02-22 18:28:33 +00001970class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001971 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001972
1973 # Subclasses MUST override these:
1974 _table_name = ''
1975 _fields = ()
1976
showarda3c58572009-03-12 20:36:59 +00001977 # A mapping from (type, id) to the instance of the object for that
1978 # particular id. This prevents us from creating new Job() and Host()
1979 # instances for every HostQueueEntry object that we instantiate as
1980 # multiple HQEs often share the same Job.
1981 _instances_by_type_and_id = weakref.WeakValueDictionary()
1982 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001983
showarda3c58572009-03-12 20:36:59 +00001984
1985 def __new__(cls, id=None, **kwargs):
1986 """
1987 Look to see if we already have an instance for this particular type
1988 and id. If so, use it instead of creating a duplicate instance.
1989 """
1990 if id is not None:
1991 instance = cls._instances_by_type_and_id.get((cls, id))
1992 if instance:
1993 return instance
1994 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1995
1996
1997 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001998 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001999 assert self._table_name, '_table_name must be defined in your class'
2000 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002001 if not new_record:
2002 if self._initialized and not always_query:
2003 return # We've already been initialized.
2004 if id is None:
2005 id = row[0]
2006 # Tell future constructors to use us instead of re-querying while
2007 # this instance is still around.
2008 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002009
showard6ae5ea92009-02-25 00:11:51 +00002010 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002011
jadmanski0afbb632008-06-06 21:10:57 +00002012 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002013
jadmanski0afbb632008-06-06 21:10:57 +00002014 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002015 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002016
showarda3c58572009-03-12 20:36:59 +00002017 if self._initialized:
2018 differences = self._compare_fields_in_row(row)
2019 if differences:
showard7629f142009-03-27 21:02:02 +00002020 logging.warn(
2021 'initialized %s %s instance requery is updating: %s',
2022 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002023 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002024 self._initialized = True
2025
2026
2027 @classmethod
2028 def _clear_instance_cache(cls):
2029 """Used for testing, clear the internal instance cache."""
2030 cls._instances_by_type_and_id.clear()
2031
2032
showardccbd6c52009-03-21 00:10:21 +00002033 def _fetch_row_from_db(self, row_id):
2034 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2035 rows = _db.execute(sql, (row_id,))
2036 if not rows:
showard76e29d12009-04-15 21:53:10 +00002037 raise DBError("row not found (table=%s, row id=%s)"
2038 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002039 return rows[0]
2040
2041
showarda3c58572009-03-12 20:36:59 +00002042 def _assert_row_length(self, row):
2043 assert len(row) == len(self._fields), (
2044 "table = %s, row = %s/%d, fields = %s/%d" % (
2045 self.__table, row, len(row), self._fields, len(self._fields)))
2046
2047
2048 def _compare_fields_in_row(self, row):
2049 """
2050 Given a row as returned by a SELECT query, compare it to our existing
2051 in memory fields.
2052
2053 @param row - A sequence of values corresponding to fields named in
2054 The class attribute _fields.
2055
2056 @returns A dictionary listing the differences keyed by field name
2057 containing tuples of (current_value, row_value).
2058 """
2059 self._assert_row_length(row)
2060 differences = {}
2061 for field, row_value in itertools.izip(self._fields, row):
2062 current_value = getattr(self, field)
2063 if current_value != row_value:
2064 differences[field] = (current_value, row_value)
2065 return differences
showard2bab8f42008-11-12 18:15:22 +00002066
2067
2068 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002069 """
2070 Update our field attributes using a single row returned by SELECT.
2071
2072 @param row - A sequence of values corresponding to fields named in
2073 the class fields list.
2074 """
2075 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002076
showard2bab8f42008-11-12 18:15:22 +00002077 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002078 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002079 setattr(self, field, value)
2080 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002081
showard2bab8f42008-11-12 18:15:22 +00002082 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002083
mblighe2586682008-02-29 22:45:46 +00002084
showardccbd6c52009-03-21 00:10:21 +00002085 def update_from_database(self):
2086 assert self.id is not None
2087 row = self._fetch_row_from_db(self.id)
2088 self._update_fields_from_row(row)
2089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def count(self, where, table = None):
2092 if not table:
2093 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002094
jadmanski0afbb632008-06-06 21:10:57 +00002095 rows = _db.execute("""
2096 SELECT count(*) FROM %s
2097 WHERE %s
2098 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002099
jadmanski0afbb632008-06-06 21:10:57 +00002100 assert len(rows) == 1
2101
2102 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002103
2104
showardd3dc1992009-04-22 21:01:40 +00002105 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002106 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002107
showard2bab8f42008-11-12 18:15:22 +00002108 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002109 return
mbligh36768f02008-02-22 18:28:33 +00002110
mblighf8c624d2008-07-03 16:58:45 +00002111 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002112 _db.execute(query, (value, self.id))
2113
showard2bab8f42008-11-12 18:15:22 +00002114 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002115
2116
jadmanski0afbb632008-06-06 21:10:57 +00002117 def save(self):
2118 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002119 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002120 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002121 values = []
2122 for key in keys:
2123 value = getattr(self, key)
2124 if value is None:
2125 values.append('NULL')
2126 else:
2127 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002128 values_str = ','.join(values)
2129 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2130 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002131 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002132 # Update our id to the one the database just assigned to us.
2133 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002134
2135
jadmanski0afbb632008-06-06 21:10:57 +00002136 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002137 self._instances_by_type_and_id.pop((type(self), id), None)
2138 self._initialized = False
2139 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002140 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2141 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002142
2143
showard63a34772008-08-18 19:32:50 +00002144 @staticmethod
2145 def _prefix_with(string, prefix):
2146 if string:
2147 string = prefix + string
2148 return string
2149
2150
jadmanski0afbb632008-06-06 21:10:57 +00002151 @classmethod
showard989f25d2008-10-01 11:38:11 +00002152 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002153 """
2154 Construct instances of our class based on the given database query.
2155
2156 @yields One class instance for each row fetched.
2157 """
showard63a34772008-08-18 19:32:50 +00002158 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2159 where = cls._prefix_with(where, 'WHERE ')
2160 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002161 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002162 'joins' : joins,
2163 'where' : where,
2164 'order_by' : order_by})
2165 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002166 for row in rows:
2167 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002168
mbligh36768f02008-02-22 18:28:33 +00002169
2170class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002171 _table_name = 'ineligible_host_queues'
2172 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002173
2174
showard89f84db2009-03-12 20:39:13 +00002175class AtomicGroup(DBObject):
2176 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002177 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2178 'invalid')
showard89f84db2009-03-12 20:39:13 +00002179
2180
showard989f25d2008-10-01 11:38:11 +00002181class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002182 _table_name = 'labels'
2183 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002184 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002185
2186
mbligh36768f02008-02-22 18:28:33 +00002187class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002188 _table_name = 'hosts'
2189 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2190 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2191
2192
jadmanski0afbb632008-06-06 21:10:57 +00002193 def current_task(self):
2194 rows = _db.execute("""
2195 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2196 """, (self.id,))
2197
2198 if len(rows) == 0:
2199 return None
2200 else:
2201 assert len(rows) == 1
2202 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002203 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002204
2205
jadmanski0afbb632008-06-06 21:10:57 +00002206 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002207 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002208 if self.current_task():
2209 self.current_task().requeue()
2210
showard6ae5ea92009-02-25 00:11:51 +00002211
jadmanski0afbb632008-06-06 21:10:57 +00002212 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002213 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002214 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002215
2216
showard170873e2009-01-07 00:22:26 +00002217 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002218 """
showard170873e2009-01-07 00:22:26 +00002219 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002220 """
2221 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002222 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002223 FROM labels
2224 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002225 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002226 ORDER BY labels.name
2227 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002228 platform = None
2229 all_labels = []
2230 for label_name, is_platform in rows:
2231 if is_platform:
2232 platform = label_name
2233 all_labels.append(label_name)
2234 return platform, all_labels
2235
2236
2237 def reverify_tasks(self):
2238 cleanup_task = CleanupTask(host=self)
2239 verify_task = VerifyTask(host=self)
2240 # just to make sure this host does not get taken away
2241 self.set_status('Cleaning')
2242 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002243
2244
mbligh36768f02008-02-22 18:28:33 +00002245class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002246 _table_name = 'host_queue_entries'
2247 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002248 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002249 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002250
2251
showarda3c58572009-03-12 20:36:59 +00002252 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002253 assert id or row
showarda3c58572009-03-12 20:36:59 +00002254 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002255 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002256
jadmanski0afbb632008-06-06 21:10:57 +00002257 if self.host_id:
2258 self.host = Host(self.host_id)
2259 else:
2260 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002261
showard170873e2009-01-07 00:22:26 +00002262 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002263 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002264
2265
showard89f84db2009-03-12 20:39:13 +00002266 @classmethod
2267 def clone(cls, template):
2268 """
2269 Creates a new row using the values from a template instance.
2270
2271 The new instance will not exist in the database or have a valid
2272 id attribute until its save() method is called.
2273 """
2274 assert isinstance(template, cls)
2275 new_row = [getattr(template, field) for field in cls._fields]
2276 clone = cls(row=new_row, new_record=True)
2277 clone.id = None
2278 return clone
2279
2280
showardc85c21b2008-11-24 22:17:37 +00002281 def _view_job_url(self):
2282 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2283
2284
showardf1ae3542009-05-11 19:26:02 +00002285 def get_labels(self):
2286 """
2287 Get all labels associated with this host queue entry (either via the
2288 meta_host or as a job dependency label). The labels yielded are not
2289 guaranteed to be unique.
2290
2291 @yields Label instances associated with this host_queue_entry.
2292 """
2293 if self.meta_host:
2294 yield Label(id=self.meta_host, always_query=False)
2295 labels = Label.fetch(
2296 joins="JOIN jobs_dependency_labels AS deps "
2297 "ON (labels.id = deps.label_id)",
2298 where="deps.job_id = %d" % self.job.id)
2299 for label in labels:
2300 yield label
2301
2302
jadmanski0afbb632008-06-06 21:10:57 +00002303 def set_host(self, host):
2304 if host:
2305 self.queue_log_record('Assigning host ' + host.hostname)
2306 self.update_field('host_id', host.id)
2307 self.update_field('active', True)
2308 self.block_host(host.id)
2309 else:
2310 self.queue_log_record('Releasing host')
2311 self.unblock_host(self.host.id)
2312 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002313
jadmanski0afbb632008-06-06 21:10:57 +00002314 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002315
2316
jadmanski0afbb632008-06-06 21:10:57 +00002317 def get_host(self):
2318 return self.host
mbligh36768f02008-02-22 18:28:33 +00002319
2320
jadmanski0afbb632008-06-06 21:10:57 +00002321 def queue_log_record(self, log_line):
2322 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002323 _drone_manager.write_lines_to_file(self.queue_log_path,
2324 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002325
2326
jadmanski0afbb632008-06-06 21:10:57 +00002327 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002328 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002329 row = [0, self.job.id, host_id]
2330 block = IneligibleHostQueue(row=row, new_record=True)
2331 block.save()
mblighe2586682008-02-29 22:45:46 +00002332
2333
jadmanski0afbb632008-06-06 21:10:57 +00002334 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002335 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002336 blocks = IneligibleHostQueue.fetch(
2337 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2338 for block in blocks:
2339 block.delete()
mblighe2586682008-02-29 22:45:46 +00002340
2341
showard2bab8f42008-11-12 18:15:22 +00002342 def set_execution_subdir(self, subdir=None):
2343 if subdir is None:
2344 assert self.get_host()
2345 subdir = self.get_host().hostname
2346 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002347
2348
showard6355f6b2008-12-05 18:52:13 +00002349 def _get_hostname(self):
2350 if self.host:
2351 return self.host.hostname
2352 return 'no host'
2353
2354
showard170873e2009-01-07 00:22:26 +00002355 def __str__(self):
2356 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2357
2358
jadmanski0afbb632008-06-06 21:10:57 +00002359 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002360 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002361
showardb18134f2009-03-20 20:52:18 +00002362 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002363
showardc85c21b2008-11-24 22:17:37 +00002364 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002365 self.update_field('complete', False)
2366 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002367
jadmanski0afbb632008-06-06 21:10:57 +00002368 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002369 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002370 self.update_field('complete', False)
2371 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002372
showardc85c21b2008-11-24 22:17:37 +00002373 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002374 self.update_field('complete', True)
2375 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002376
2377 should_email_status = (status.lower() in _notify_email_statuses or
2378 'all' in _notify_email_statuses)
2379 if should_email_status:
2380 self._email_on_status(status)
2381
2382 self._email_on_job_complete()
2383
2384
2385 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002386 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002387
2388 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2389 self.job.id, self.job.name, hostname, status)
2390 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2391 self.job.id, self.job.name, hostname, status,
2392 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002393 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002394
2395
2396 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002397 if not self.job.is_finished():
2398 return
showard542e8402008-09-19 20:16:18 +00002399
showardc85c21b2008-11-24 22:17:37 +00002400 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002401 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002402 for queue_entry in hosts_queue:
2403 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002404 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002405 queue_entry.status))
2406
2407 summary_text = "\n".join(summary_text)
2408 status_counts = models.Job.objects.get_status_counts(
2409 [self.job.id])[self.job.id]
2410 status = ', '.join('%d %s' % (count, status) for status, count
2411 in status_counts.iteritems())
2412
2413 subject = 'Autotest: Job ID: %s "%s" %s' % (
2414 self.job.id, self.job.name, status)
2415 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2416 self.job.id, self.job.name, status, self._view_job_url(),
2417 summary_text)
showard170873e2009-01-07 00:22:26 +00002418 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002419
2420
showard89f84db2009-03-12 20:39:13 +00002421 def run(self, assigned_host=None):
2422 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002423 assert assigned_host
2424 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002425 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002426
showardb18134f2009-03-20 20:52:18 +00002427 logging.info("%s/%s/%s scheduled on %s, status=%s",
2428 self.job.name, self.meta_host, self.atomic_group_id,
2429 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002430
jadmanski0afbb632008-06-06 21:10:57 +00002431 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002432
showard6ae5ea92009-02-25 00:11:51 +00002433
jadmanski0afbb632008-06-06 21:10:57 +00002434 def requeue(self):
2435 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002436 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002437 # verify/cleanup failure sets the execution subdir, so reset it here
2438 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002439 if self.meta_host:
2440 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002441
2442
jadmanski0afbb632008-06-06 21:10:57 +00002443 def handle_host_failure(self):
2444 """\
2445 Called when this queue entry's host has failed verification and
2446 repair.
2447 """
2448 assert not self.meta_host
2449 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002450 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002451
2452
jadmanskif7fa2cc2008-10-01 14:13:23 +00002453 @property
2454 def aborted_by(self):
2455 self._load_abort_info()
2456 return self._aborted_by
2457
2458
2459 @property
2460 def aborted_on(self):
2461 self._load_abort_info()
2462 return self._aborted_on
2463
2464
2465 def _load_abort_info(self):
2466 """ Fetch info about who aborted the job. """
2467 if hasattr(self, "_aborted_by"):
2468 return
2469 rows = _db.execute("""
2470 SELECT users.login, aborted_host_queue_entries.aborted_on
2471 FROM aborted_host_queue_entries
2472 INNER JOIN users
2473 ON users.id = aborted_host_queue_entries.aborted_by_id
2474 WHERE aborted_host_queue_entries.queue_entry_id = %s
2475 """, (self.id,))
2476 if rows:
2477 self._aborted_by, self._aborted_on = rows[0]
2478 else:
2479 self._aborted_by = self._aborted_on = None
2480
2481
showardb2e2c322008-10-14 17:33:55 +00002482 def on_pending(self):
2483 """
2484 Called when an entry in a synchronous job has passed verify. If the
2485 job is ready to run, returns an agent to run the job. Returns None
2486 otherwise.
2487 """
2488 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002489 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002490 if self.job.is_ready():
2491 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002492 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002493 return None
2494
2495
showardd3dc1992009-04-22 21:01:40 +00002496 def abort(self, dispatcher):
2497 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002498
showardd3dc1992009-04-22 21:01:40 +00002499 Status = models.HostQueueEntry.Status
2500 has_running_job_agent = (
2501 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2502 and dispatcher.get_agents_for_entry(self))
2503 if has_running_job_agent:
2504 # do nothing; post-job tasks will finish and then mark this entry
2505 # with status "Aborted" and take care of the host
2506 return
2507
2508 if self.status in (Status.STARTING, Status.PENDING):
2509 self.host.set_status(models.Host.Status.READY)
2510 elif self.status == Status.VERIFYING:
2511 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2512
2513 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002514
2515 def execution_tag(self):
2516 assert self.execution_subdir
2517 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002518
2519
mbligh36768f02008-02-22 18:28:33 +00002520class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002521 _table_name = 'jobs'
2522 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2523 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002524 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002525 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002526
2527
showarda3c58572009-03-12 20:36:59 +00002528 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002529 assert id or row
showarda3c58572009-03-12 20:36:59 +00002530 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002531
mblighe2586682008-02-29 22:45:46 +00002532
jadmanski0afbb632008-06-06 21:10:57 +00002533 def is_server_job(self):
2534 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002535
2536
showard170873e2009-01-07 00:22:26 +00002537 def tag(self):
2538 return "%s-%s" % (self.id, self.owner)
2539
2540
jadmanski0afbb632008-06-06 21:10:57 +00002541 def get_host_queue_entries(self):
2542 rows = _db.execute("""
2543 SELECT * FROM host_queue_entries
2544 WHERE job_id= %s
2545 """, (self.id,))
2546 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002547
jadmanski0afbb632008-06-06 21:10:57 +00002548 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002549
jadmanski0afbb632008-06-06 21:10:57 +00002550 return entries
mbligh36768f02008-02-22 18:28:33 +00002551
2552
jadmanski0afbb632008-06-06 21:10:57 +00002553 def set_status(self, status, update_queues=False):
2554 self.update_field('status',status)
2555
2556 if update_queues:
2557 for queue_entry in self.get_host_queue_entries():
2558 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002559
2560
jadmanski0afbb632008-06-06 21:10:57 +00002561 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002562 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2563 status='Pending')
2564 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002565
2566
jadmanski0afbb632008-06-06 21:10:57 +00002567 def num_machines(self, clause = None):
2568 sql = "job_id=%s" % self.id
2569 if clause:
2570 sql += " AND (%s)" % clause
2571 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002572
2573
jadmanski0afbb632008-06-06 21:10:57 +00002574 def num_queued(self):
2575 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002576
2577
jadmanski0afbb632008-06-06 21:10:57 +00002578 def num_active(self):
2579 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002580
2581
jadmanski0afbb632008-06-06 21:10:57 +00002582 def num_complete(self):
2583 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002584
2585
jadmanski0afbb632008-06-06 21:10:57 +00002586 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002587 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002588
mbligh36768f02008-02-22 18:28:33 +00002589
showard6bb7c292009-01-30 01:44:51 +00002590 def _not_yet_run_entries(self, include_verifying=True):
2591 statuses = [models.HostQueueEntry.Status.QUEUED,
2592 models.HostQueueEntry.Status.PENDING]
2593 if include_verifying:
2594 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2595 return models.HostQueueEntry.objects.filter(job=self.id,
2596 status__in=statuses)
2597
2598
2599 def _stop_all_entries(self):
2600 entries_to_stop = self._not_yet_run_entries(
2601 include_verifying=False)
2602 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002603 assert not child_entry.complete, (
2604 '%s status=%s, active=%s, complete=%s' %
2605 (child_entry.id, child_entry.status, child_entry.active,
2606 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002607 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2608 child_entry.host.status = models.Host.Status.READY
2609 child_entry.host.save()
2610 child_entry.status = models.HostQueueEntry.Status.STOPPED
2611 child_entry.save()
2612
showard2bab8f42008-11-12 18:15:22 +00002613 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002614 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002615 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002616 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002617
2618
jadmanski0afbb632008-06-06 21:10:57 +00002619 def write_to_machines_file(self, queue_entry):
2620 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002621 file_path = os.path.join(self.tag(), '.machines')
2622 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002623
2624
showardf1ae3542009-05-11 19:26:02 +00002625 def _next_group_name(self, group_name=''):
2626 """@returns a directory name to use for the next host group results."""
2627 if group_name:
2628 # Sanitize for use as a pathname.
2629 group_name = group_name.replace(os.path.sep, '_')
2630 if group_name.startswith('.'):
2631 group_name = '_' + group_name[1:]
2632 # Add a separator between the group name and 'group%d'.
2633 group_name += '.'
2634 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002635 query = models.HostQueueEntry.objects.filter(
2636 job=self.id).values('execution_subdir').distinct()
2637 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002638 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2639 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002640 if ids:
2641 next_id = max(ids) + 1
2642 else:
2643 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002644 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002645
2646
showard170873e2009-01-07 00:22:26 +00002647 def _write_control_file(self, execution_tag):
2648 control_path = _drone_manager.attach_file_to_execution(
2649 execution_tag, self.control_file)
2650 return control_path
mbligh36768f02008-02-22 18:28:33 +00002651
showardb2e2c322008-10-14 17:33:55 +00002652
showard2bab8f42008-11-12 18:15:22 +00002653 def get_group_entries(self, queue_entry_from_group):
2654 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002655 return list(HostQueueEntry.fetch(
2656 where='job_id=%s AND execution_subdir=%s',
2657 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002658
2659
showardb2e2c322008-10-14 17:33:55 +00002660 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002661 assert queue_entries
2662 execution_tag = queue_entries[0].execution_tag()
2663 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002664 hostnames = ','.join([entry.get_host().hostname
2665 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002666
showard87ba02a2009-04-20 19:37:32 +00002667 params = _autoserv_command_line(
2668 hostnames, execution_tag,
2669 ['-P', execution_tag, '-n',
2670 _drone_manager.absolute_path(control_path)],
2671 job=self)
mbligh36768f02008-02-22 18:28:33 +00002672
jadmanski0afbb632008-06-06 21:10:57 +00002673 if not self.is_server_job():
2674 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002675
showardb2e2c322008-10-14 17:33:55 +00002676 return params
mblighe2586682008-02-29 22:45:46 +00002677
mbligh36768f02008-02-22 18:28:33 +00002678
showardc9ae1782009-01-30 01:42:37 +00002679 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002680 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002681 return True
showard0fc38302008-10-23 00:44:07 +00002682 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002683 return queue_entry.get_host().dirty
2684 return False
showard21baa452008-10-21 00:08:39 +00002685
showardc9ae1782009-01-30 01:42:37 +00002686
2687 def _should_run_verify(self, queue_entry):
2688 do_not_verify = (queue_entry.host.protection ==
2689 host_protections.Protection.DO_NOT_VERIFY)
2690 if do_not_verify:
2691 return False
2692 return self.run_verify
2693
2694
2695 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002696 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002697 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002698 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002699 if self._should_run_verify(queue_entry):
2700 tasks.append(VerifyTask(queue_entry=queue_entry))
2701 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002702 return tasks
2703
2704
showardf1ae3542009-05-11 19:26:02 +00002705 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002706 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002707 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002708 else:
showardf1ae3542009-05-11 19:26:02 +00002709 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002710 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002711 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002712 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002713
2714 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002715 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002716
2717
2718 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002719 """
2720 @returns A tuple containing a list of HostQueueEntry instances to be
2721 used to run this Job, a string group name to suggest giving
2722 to this job a results database.
2723 """
2724 if include_queue_entry.atomic_group_id:
2725 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2726 always_query=False)
2727 else:
2728 atomic_group = None
2729
showard2bab8f42008-11-12 18:15:22 +00002730 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002731 if atomic_group:
2732 num_entries_wanted = atomic_group.max_number_of_machines
2733 else:
2734 num_entries_wanted = self.synch_count
2735 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002736
showardf1ae3542009-05-11 19:26:02 +00002737 if num_entries_wanted > 0:
2738 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard2bab8f42008-11-12 18:15:22 +00002739 pending_entries = HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002740 where=where_clause,
2741 params=(self.id, include_queue_entry.id))
2742 # TODO(gps): sort these by hostname before slicing.
2743 chosen_entries += list(pending_entries)[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002744
showardf1ae3542009-05-11 19:26:02 +00002745 # Sanity check. We'll only ever be called if this can be met.
2746 assert len(chosen_entries) >= self.synch_count
2747
2748 if atomic_group:
2749 # Look at any meta_host and dependency labels and pick the first
2750 # one that also specifies this atomic group. Use that label name
2751 # as the group name if possible (it is more specific).
2752 group_name = atomic_group.name
2753 for label in include_queue_entry.get_labels():
2754 if label.atomic_group_id:
2755 assert label.atomic_group_id == atomic_group.id
2756 group_name = label.name
2757 break
2758 else:
2759 group_name = ''
2760
2761 self._assign_new_group(chosen_entries, group_name=group_name)
2762 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002763
2764
2765 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002766 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002767 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2768 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002769
showardf1ae3542009-05-11 19:26:02 +00002770 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2771 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002772
2773
showardf1ae3542009-05-11 19:26:02 +00002774 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002775 for queue_entry in queue_entries:
2776 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002777 params = self._get_autoserv_params(queue_entries)
2778 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002779 cmd=params, group_name=group_name)
2780 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002781 entry_ids = [entry.id for entry in queue_entries]
2782
showard170873e2009-01-07 00:22:26 +00002783 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002784
2785
mbligh36768f02008-02-22 18:28:33 +00002786if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002787 main()