blob: 8a5c6fc110f1738d7c45ce8c133a859065102a31 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000085 except SystemExit:
86 raise
showard27f33872009-04-07 18:20:53 +000087 except:
88 logging.exception('Exception escaping in monitor_db')
89 raise
90
91
92def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000093 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000094
jadmanski0afbb632008-06-06 21:10:57 +000095 parser = optparse.OptionParser(usage)
96 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
97 action='store_true')
98 parser.add_option('--logfile', help='Set a log file that all stdout ' +
99 'should be redirected to. Stderr will go to this ' +
100 'file + ".err"')
101 parser.add_option('--test', help='Indicate that scheduler is under ' +
102 'test and should use dummy autoserv and no parsing',
103 action='store_true')
104 (options, args) = parser.parse_args()
105 if len(args) != 1:
106 parser.print_usage()
107 return
mbligh36768f02008-02-22 18:28:33 +0000108
jadmanski0afbb632008-06-06 21:10:57 +0000109 global RESULTS_DIR
110 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000111
mbligh83c1e9e2009-05-01 23:10:41 +0000112 site_init = utils.import_site_function(__file__,
113 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
114 _site_init_monitor_db_dummy)
115 site_init()
116
showardcca334f2009-03-12 20:38:34 +0000117 # Change the cwd while running to avoid issues incase we were launched from
118 # somewhere odd (such as a random NFS home directory of the person running
119 # sudo to launch us as the appropriate user).
120 os.chdir(RESULTS_DIR)
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000123 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
124 "notify_email_statuses",
125 default='')
showardc85c21b2008-11-24 22:17:37 +0000126 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000127 _notify_email_statuses = [status for status in
128 re.split(r'[\s,;:]', notify_statuses_list.lower())
129 if status]
showardc85c21b2008-11-24 22:17:37 +0000130
jadmanski0afbb632008-06-06 21:10:57 +0000131 if options.test:
132 global _autoserv_path
133 _autoserv_path = 'autoserv_dummy'
134 global _testing_mode
135 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000136
mbligh37eceaa2008-12-15 22:56:37 +0000137 # AUTOTEST_WEB.base_url is still a supported config option as some people
138 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000139 global _base_url
showard170873e2009-01-07 00:22:26 +0000140 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
141 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000142 if config_base_url:
143 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000144 else:
mbligh37eceaa2008-12-15 22:56:37 +0000145 # For the common case of everything running on a single server you
146 # can just set the hostname in a single place in the config file.
147 server_name = c.get_config_value('SERVER', 'hostname')
148 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000149 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000150 sys.exit(1)
151 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000152
showardc5afc462009-01-13 00:09:39 +0000153 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
showardc5afc462009-01-13 00:09:39 +0000157 init(options.logfile)
158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
jadmanski0afbb632008-06-06 21:10:57 +0000161 while not _shutdown:
162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
180def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000181 if logfile:
182 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
184 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000185
mblighfb676032009-04-01 18:25:38 +0000186 utils.write_pid("monitor_db")
187
showardb1e51872008-10-07 11:08:18 +0000188 if _testing_mode:
189 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000190 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
193 global _db
showard170873e2009-01-07 00:22:26 +0000194 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000195 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000196
showardfa8629c2008-11-04 16:51:23 +0000197 # ensure Django connection is in autocommit
198 setup_django_environment.enable_autocommit()
199
showardb18134f2009-03-20 20:52:18 +0000200 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000201 signal.signal(signal.SIGINT, handle_sigint)
202
showardd1ee1dd2009-01-07 21:33:08 +0000203 drones = global_config.global_config.get_config_value(
204 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
205 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000206 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000207 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000208 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000211
212
213def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000214 out_file = logfile
215 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000216 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000217 out_fd = open(out_file, "a", buffering=0)
218 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000219
jadmanski0afbb632008-06-06 21:10:57 +0000220 os.dup2(out_fd.fileno(), sys.stdout.fileno())
221 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000222
jadmanski0afbb632008-06-06 21:10:57 +0000223 sys.stdout = out_fd
224 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000225
226
showard87ba02a2009-04-20 19:37:32 +0000227def _autoserv_command_line(machines, results_dir, extra_args, job=None,
228 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
234 @param results_dir - string - Where the results will be written (-r).
235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showard87ba02a2009-04-20 19:37:32 +0000241 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
242 '-r', _drone_manager.absolute_path(results_dir)]
243 if job or queue_entry:
244 if not job:
245 job = queue_entry.job
246 autoserv_argv += ['-u', job.owner, '-l', job.name]
247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
405 @param host_labels - A list of label ids that the host has.
406 @param queue_entry - The HostQueueEntry being considered for the host.
407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
410 return (self._get_host_atomic_group_id(host_labels) ==
411 queue_entry.atomic_group_id)
412
413
414 def _get_host_atomic_group_id(self, host_labels):
415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
420 @param host_labels - A list of label ids that the host has.
421
422 @returns The id of the atomic group found on a label in host_labels
423 or None if no atomic group label is found.
424 @raises SchedulerError - If more than one atomic group label is found.
425 """
426 atomic_ids = [self._labels[label_id].atomic_group_id
427 for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 if not atomic_ids:
430 return None
431 if len(atomic_ids) > 1:
432 raise SchedulerError('More than one atomic label on host.')
433 return atomic_ids[0]
434
435
436 def _get_atomic_group_labels(self, atomic_group_id):
437 """
438 Lookup the label ids that an atomic_group is associated with.
439
440 @param atomic_group_id - The id of the AtomicGroup to look up.
441
442 @returns A generator yeilding Label ids for this atomic group.
443 """
444 return (id for id, label in self._labels.iteritems()
445 if label.atomic_group_id == atomic_group_id
446 and not label.invalid)
447
448
showard54c1ea92009-05-20 00:32:58 +0000449 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000450 """
451 @param group_hosts - A sequence of Host ids to test for usability
452 and eligibility against the Job associated with queue_entry.
453 @param queue_entry - The HostQueueEntry that these hosts are being
454 tested for eligibility against.
455
456 @returns A subset of group_hosts Host ids that are eligible for the
457 supplied queue_entry.
458 """
459 return set(host_id for host_id in group_hosts
460 if self._is_host_usable(host_id)
461 and self._is_host_eligible_for_job(host_id, queue_entry))
462
463
showard989f25d2008-10-01 11:38:11 +0000464 def _is_host_eligible_for_job(self, host_id, queue_entry):
465 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
466 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000467
showard89f84db2009-03-12 20:39:13 +0000468 return (self._is_acl_accessible(host_id, queue_entry) and
469 self._check_job_dependencies(job_dependencies, host_labels) and
470 self._check_only_if_needed_labels(
471 job_dependencies, host_labels, queue_entry) and
472 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000473
474
showard63a34772008-08-18 19:32:50 +0000475 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000476 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000477 return None
478 return self._hosts_available.pop(queue_entry.host_id, None)
479
480
481 def _is_host_usable(self, host_id):
482 if host_id not in self._hosts_available:
483 # host was already used during this scheduling cycle
484 return False
485 if self._hosts_available[host_id].invalid:
486 # Invalid hosts cannot be used for metahosts. They're included in
487 # the original query because they can be used by non-metahosts.
488 return False
489 return True
490
491
492 def _schedule_metahost(self, queue_entry):
493 label_id = queue_entry.meta_host
494 hosts_in_label = self._label_hosts.get(label_id, set())
495 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
496 set())
497
498 # must iterate over a copy so we can mutate the original while iterating
499 for host_id in list(hosts_in_label):
500 if not self._is_host_usable(host_id):
501 hosts_in_label.remove(host_id)
502 continue
503 if host_id in ineligible_host_ids:
504 continue
showard989f25d2008-10-01 11:38:11 +0000505 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000506 continue
507
showard89f84db2009-03-12 20:39:13 +0000508 # Remove the host from our cached internal state before returning
509 # the host object.
showard63a34772008-08-18 19:32:50 +0000510 hosts_in_label.remove(host_id)
511 return self._hosts_available.pop(host_id)
512 return None
513
514
515 def find_eligible_host(self, queue_entry):
516 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000517 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000518 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_metahost(queue_entry)
521
522
showard89f84db2009-03-12 20:39:13 +0000523 def find_eligible_atomic_group(self, queue_entry):
524 """
525 Given an atomic group host queue entry, locate an appropriate group
526 of hosts for the associated job to run on.
527
528 The caller is responsible for creating new HQEs for the additional
529 hosts returned in order to run the actual job on them.
530
531 @returns A list of Host instances in a ready state to satisfy this
532 atomic group scheduling. Hosts will all belong to the same
533 atomic group label as specified by the queue_entry.
534 An empty list will be returned if no suitable atomic
535 group could be found.
536
537 TODO(gps): what is responsible for kicking off any attempted repairs on
538 a group of hosts? not this function, but something needs to. We do
539 not communicate that reason for returning [] outside of here...
540 For now, we'll just be unschedulable if enough hosts within one group
541 enter Repair Failed state.
542 """
543 assert queue_entry.atomic_group_id is not None
544 job = queue_entry.job
545 assert job.synch_count and job.synch_count > 0
546 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
547 if job.synch_count > atomic_group.max_number_of_machines:
548 # Such a Job and HostQueueEntry should never be possible to
549 # create using the frontend. Regardless, we can't process it.
550 # Abort it immediately and log an error on the scheduler.
551 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000552 logging.error(
553 'Error: job %d synch_count=%d > requested atomic_group %d '
554 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
555 job.id, job.synch_count, atomic_group.id,
556 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000557 return []
558 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
559 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
560 set())
561
562 # Look in each label associated with atomic_group until we find one with
563 # enough hosts to satisfy the job.
564 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
565 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
566 if queue_entry.meta_host is not None:
567 # If we have a metahost label, only allow its hosts.
568 group_hosts.intersection_update(hosts_in_label)
569 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000570 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000571 group_hosts, queue_entry)
572
573 # Job.synch_count is treated as "minimum synch count" when
574 # scheduling for an atomic group of hosts. The atomic group
575 # number of machines is the maximum to pick out of a single
576 # atomic group label for scheduling at one time.
577 min_hosts = job.synch_count
578 max_hosts = atomic_group.max_number_of_machines
579
showard54c1ea92009-05-20 00:32:58 +0000580 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000581 # Not enough eligible hosts in this atomic group label.
582 continue
583
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_hosts_in_group = [self._hosts_available[id]
585 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000586 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000587 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000588
showard89f84db2009-03-12 20:39:13 +0000589 # Limit ourselves to scheduling the atomic group size.
590 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000591 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000592
593 # Remove the selected hosts from our cached internal state
594 # of available hosts in order to return the Host objects.
595 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000596 for host in eligible_hosts_in_group:
597 hosts_in_label.discard(host.id)
598 self._hosts_available.pop(host.id)
599 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000600 return host_list
601
602 return []
603
604
showard170873e2009-01-07 00:22:26 +0000605class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000606 def __init__(self):
607 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000608 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000609 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000610 user_cleanup_time = scheduler_config.config.clean_interval
611 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
612 _db, user_cleanup_time)
613 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000614 self._host_agents = {}
615 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000616
mbligh36768f02008-02-22 18:28:33 +0000617
showard915958d2009-04-22 21:00:58 +0000618 def initialize(self, recover_hosts=True):
619 self._periodic_cleanup.initialize()
620 self._24hr_upkeep.initialize()
621
jadmanski0afbb632008-06-06 21:10:57 +0000622 # always recover processes
623 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000624
jadmanski0afbb632008-06-06 21:10:57 +0000625 if recover_hosts:
626 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000627
628
jadmanski0afbb632008-06-06 21:10:57 +0000629 def tick(self):
showard170873e2009-01-07 00:22:26 +0000630 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000631 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000632 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000633 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000634 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000635 self._schedule_new_jobs()
636 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000637 _drone_manager.execute_actions()
638 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000639
showard97aed502008-11-04 02:01:24 +0000640
mblighf3294cc2009-04-08 21:17:38 +0000641 def _run_cleanup(self):
642 self._periodic_cleanup.run_cleanup_maybe()
643 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000644
mbligh36768f02008-02-22 18:28:33 +0000645
showard170873e2009-01-07 00:22:26 +0000646 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
647 for object_id in object_ids:
648 agent_dict.setdefault(object_id, set()).add(agent)
649
650
651 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
652 for object_id in object_ids:
653 assert object_id in agent_dict
654 agent_dict[object_id].remove(agent)
655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def add_agent(self, agent):
658 self._agents.append(agent)
659 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000660 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
661 self._register_agent_for_ids(self._queue_entry_agents,
662 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000663
showard170873e2009-01-07 00:22:26 +0000664
665 def get_agents_for_entry(self, queue_entry):
666 """
667 Find agents corresponding to the specified queue_entry.
668 """
showardd3dc1992009-04-22 21:01:40 +0000669 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000670
671
672 def host_has_agent(self, host):
673 """
674 Determine if there is currently an Agent present using this host.
675 """
676 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000677
678
jadmanski0afbb632008-06-06 21:10:57 +0000679 def remove_agent(self, agent):
680 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000681 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
682 agent)
683 self._unregister_agent_for_ids(self._queue_entry_agents,
684 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000685
686
jadmanski0afbb632008-06-06 21:10:57 +0000687 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000688 self._register_pidfiles()
689 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000690 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000691 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000692 self._reverify_remaining_hosts()
693 # reinitialize drones after killing orphaned processes, since they can
694 # leave around files when they die
695 _drone_manager.execute_actions()
696 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000697
showard170873e2009-01-07 00:22:26 +0000698
699 def _register_pidfiles(self):
700 # during recovery we may need to read pidfiles for both running and
701 # parsing entries
702 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000703 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000704 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000705 for pidfile_name in _ALL_PIDFILE_NAMES:
706 pidfile_id = _drone_manager.get_pidfile_id_from(
707 queue_entry.execution_tag(), pidfile_name=pidfile_name)
708 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000709
710
showardd3dc1992009-04-22 21:01:40 +0000711 def _recover_entries_with_status(self, status, orphans, pidfile_name,
712 recover_entries_fn):
713 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000714 for queue_entry in queue_entries:
715 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000716 # synchronous job we've already recovered
717 continue
showardd3dc1992009-04-22 21:01:40 +0000718 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000719 execution_tag = queue_entry.execution_tag()
720 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000721 run_monitor.attach_to_existing_process(execution_tag,
722 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000723
724 log_message = ('Recovering %s entry %s ' %
725 (status.lower(),
726 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000727 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000728 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000729 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000730 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000731 continue
mbligh90a549d2008-03-25 23:52:34 +0000732
showard597bfd32009-05-08 18:22:50 +0000733 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000734 run_monitor.get_process())
735 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
736 orphans.discard(run_monitor.get_process())
737
738
739 def _kill_remaining_orphan_processes(self, orphans):
740 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000741 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000742 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000743
showard170873e2009-01-07 00:22:26 +0000744
showardd3dc1992009-04-22 21:01:40 +0000745 def _recover_running_entries(self, orphans):
746 def recover_entries(job, queue_entries, run_monitor):
747 if run_monitor is not None:
748 queue_task = RecoveryQueueTask(job=job,
749 queue_entries=queue_entries,
750 run_monitor=run_monitor)
751 self.add_agent(Agent(tasks=[queue_task],
752 num_processes=len(queue_entries)))
753 # else, _requeue_other_active_entries will cover this
754
755 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
756 orphans, '.autoserv_execute',
757 recover_entries)
758
759
760 def _recover_gathering_entries(self, orphans):
761 def recover_entries(job, queue_entries, run_monitor):
762 gather_task = GatherLogsTask(job, queue_entries,
763 run_monitor=run_monitor)
764 self.add_agent(Agent([gather_task]))
765
766 self._recover_entries_with_status(
767 models.HostQueueEntry.Status.GATHERING,
768 orphans, _CRASHINFO_PID_FILE, recover_entries)
769
770
771 def _recover_parsing_entries(self, orphans):
772 def recover_entries(job, queue_entries, run_monitor):
773 reparse_task = FinalReparseTask(queue_entries,
774 run_monitor=run_monitor)
775 self.add_agent(Agent([reparse_task], num_processes=0))
776
777 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
778 orphans, _PARSER_PID_FILE,
779 recover_entries)
780
781
782 def _recover_all_recoverable_entries(self):
783 orphans = _drone_manager.get_orphaned_autoserv_processes()
784 self._recover_running_entries(orphans)
785 self._recover_gathering_entries(orphans)
786 self._recover_parsing_entries(orphans)
787 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000788
showard97aed502008-11-04 02:01:24 +0000789
showard170873e2009-01-07 00:22:26 +0000790 def _requeue_other_active_entries(self):
791 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000792 where='active AND NOT complete AND '
793 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000794 for queue_entry in queue_entries:
795 if self.get_agents_for_entry(queue_entry):
796 # entry has already been recovered
797 continue
showardd3dc1992009-04-22 21:01:40 +0000798 if queue_entry.aborted:
799 queue_entry.abort(self)
800 continue
801
802 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000803 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000804 if queue_entry.host:
805 tasks = queue_entry.host.reverify_tasks()
806 self.add_agent(Agent(tasks))
807 agent = queue_entry.requeue()
808
809
showard1ff7b2e2009-05-15 23:17:18 +0000810 def _find_reverify(self):
811 self._reverify_hosts_where("status = 'Reverify'")
812
813
showard170873e2009-01-07 00:22:26 +0000814 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000815 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000816 self._reverify_hosts_where("""(status = 'Repairing' OR
817 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000818 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000819
showard170873e2009-01-07 00:22:26 +0000820 # recover "Running" hosts with no active queue entries, although this
821 # should never happen
822 message = ('Recovering running host %s - this probably indicates a '
823 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000824 self._reverify_hosts_where("""status = 'Running' AND
825 id NOT IN (SELECT host_id
826 FROM host_queue_entries
827 WHERE active)""",
828 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000829
830
jadmanski0afbb632008-06-06 21:10:57 +0000831 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000832 print_message='Reverifying host %s'):
833 full_where='locked = 0 AND invalid = 0 AND ' + where
834 for host in Host.fetch(where=full_where):
835 if self.host_has_agent(host):
836 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000837 continue
showard170873e2009-01-07 00:22:26 +0000838 if print_message:
showardb18134f2009-03-20 20:52:18 +0000839 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000840 tasks = host.reverify_tasks()
841 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000842
843
jadmanski0afbb632008-06-06 21:10:57 +0000844 def _recover_hosts(self):
845 # recover "Repair Failed" hosts
846 message = 'Reverifying dead host %s'
847 self._reverify_hosts_where("status = 'Repair Failed'",
848 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000849
850
showard04c82c52008-05-29 19:38:12 +0000851
showardb95b1bd2008-08-15 18:11:04 +0000852 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000853 # prioritize by job priority, then non-metahost over metahost, then FIFO
854 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000855 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000856 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000857 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000858
859
showard89f84db2009-03-12 20:39:13 +0000860 def _refresh_pending_queue_entries(self):
861 """
862 Lookup the pending HostQueueEntries and call our HostScheduler
863 refresh() method given that list. Return the list.
864
865 @returns A list of pending HostQueueEntries sorted in priority order.
866 """
showard63a34772008-08-18 19:32:50 +0000867 queue_entries = self._get_pending_queue_entries()
868 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000869 return []
showardb95b1bd2008-08-15 18:11:04 +0000870
showard63a34772008-08-18 19:32:50 +0000871 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000872
showard89f84db2009-03-12 20:39:13 +0000873 return queue_entries
874
875
876 def _schedule_atomic_group(self, queue_entry):
877 """
878 Schedule the given queue_entry on an atomic group of hosts.
879
880 Returns immediately if there are insufficient available hosts.
881
882 Creates new HostQueueEntries based off of queue_entry for the
883 scheduled hosts and starts them all running.
884 """
885 # This is a virtual host queue entry representing an entire
886 # atomic group, find a group and schedule their hosts.
887 group_hosts = self._host_scheduler.find_eligible_atomic_group(
888 queue_entry)
889 if not group_hosts:
890 return
891 # The first assigned host uses the original HostQueueEntry
892 group_queue_entries = [queue_entry]
893 for assigned_host in group_hosts[1:]:
894 # Create a new HQE for every additional assigned_host.
895 new_hqe = HostQueueEntry.clone(queue_entry)
896 new_hqe.save()
897 group_queue_entries.append(new_hqe)
898 assert len(group_queue_entries) == len(group_hosts)
899 for queue_entry, host in itertools.izip(group_queue_entries,
900 group_hosts):
901 self._run_queue_entry(queue_entry, host)
902
903
904 def _schedule_new_jobs(self):
905 queue_entries = self._refresh_pending_queue_entries()
906 if not queue_entries:
907 return
908
showard63a34772008-08-18 19:32:50 +0000909 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000910 if (queue_entry.atomic_group_id is None or
911 queue_entry.host_id is not None):
912 assigned_host = self._host_scheduler.find_eligible_host(
913 queue_entry)
914 if assigned_host:
915 self._run_queue_entry(queue_entry, assigned_host)
916 else:
917 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000918
919
920 def _run_queue_entry(self, queue_entry, host):
921 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000922 # in some cases (synchronous jobs with run_verify=False), agent may be
923 # None
showard9976ce92008-10-15 20:28:13 +0000924 if agent:
925 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000926
927
jadmanski0afbb632008-06-06 21:10:57 +0000928 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000929 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
930 for agent in self.get_agents_for_entry(entry):
931 agent.abort()
932 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000933
934
showard324bf812009-01-20 23:23:38 +0000935 def _can_start_agent(self, agent, num_started_this_cycle,
936 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000937 # always allow zero-process agents to run
938 if agent.num_processes == 0:
939 return True
940 # don't allow any nonzero-process agents to run after we've reached a
941 # limit (this avoids starvation of many-process agents)
942 if have_reached_limit:
943 return False
944 # total process throttling
showard324bf812009-01-20 23:23:38 +0000945 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000946 return False
947 # if a single agent exceeds the per-cycle throttling, still allow it to
948 # run when it's the first agent in the cycle
949 if num_started_this_cycle == 0:
950 return True
951 # per-cycle throttling
952 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000953 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000954 return False
955 return True
956
957
jadmanski0afbb632008-06-06 21:10:57 +0000958 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000959 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000960 have_reached_limit = False
961 # iterate over copy, so we can remove agents during iteration
962 for agent in list(self._agents):
963 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000964 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000965 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000966 continue
967 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000968 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000969 have_reached_limit):
970 have_reached_limit = True
971 continue
showard4c5374f2008-09-04 17:02:56 +0000972 num_started_this_cycle += agent.num_processes
973 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000974 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000975 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000976
977
showard29f7cd22009-04-29 21:16:24 +0000978 def _process_recurring_runs(self):
979 recurring_runs = models.RecurringRun.objects.filter(
980 start_date__lte=datetime.datetime.now())
981 for rrun in recurring_runs:
982 # Create job from template
983 job = rrun.job
984 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000985 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000986
987 host_objects = info['hosts']
988 one_time_hosts = info['one_time_hosts']
989 metahost_objects = info['meta_hosts']
990 dependencies = info['dependencies']
991 atomic_group = info['atomic_group']
992
993 for host in one_time_hosts or []:
994 this_host = models.Host.create_one_time_host(host.hostname)
995 host_objects.append(this_host)
996
997 try:
998 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000999 options=options,
showard29f7cd22009-04-29 21:16:24 +00001000 host_objects=host_objects,
1001 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001002 atomic_group=atomic_group)
1003
1004 except Exception, ex:
1005 logging.exception(ex)
1006 #TODO send email
1007
1008 if rrun.loop_count == 1:
1009 rrun.delete()
1010 else:
1011 if rrun.loop_count != 0: # if not infinite loop
1012 # calculate new start_date
1013 difference = datetime.timedelta(seconds=rrun.loop_period)
1014 rrun.start_date = rrun.start_date + difference
1015 rrun.loop_count -= 1
1016 rrun.save()
1017
1018
showard170873e2009-01-07 00:22:26 +00001019class PidfileRunMonitor(object):
1020 """
1021 Client must call either run() to start a new process or
1022 attach_to_existing_process().
1023 """
mbligh36768f02008-02-22 18:28:33 +00001024
showard170873e2009-01-07 00:22:26 +00001025 class _PidfileException(Exception):
1026 """
1027 Raised when there's some unexpected behavior with the pid file, but only
1028 used internally (never allowed to escape this class).
1029 """
mbligh36768f02008-02-22 18:28:33 +00001030
1031
showard170873e2009-01-07 00:22:26 +00001032 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001033 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001034 self._start_time = None
1035 self.pidfile_id = None
1036 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001037
1038
showard170873e2009-01-07 00:22:26 +00001039 def _add_nice_command(self, command, nice_level):
1040 if not nice_level:
1041 return command
1042 return ['nice', '-n', str(nice_level)] + command
1043
1044
1045 def _set_start_time(self):
1046 self._start_time = time.time()
1047
1048
1049 def run(self, command, working_directory, nice_level=None, log_file=None,
1050 pidfile_name=None, paired_with_pidfile=None):
1051 assert command is not None
1052 if nice_level is not None:
1053 command = ['nice', '-n', str(nice_level)] + command
1054 self._set_start_time()
1055 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001056 command, working_directory, pidfile_name=pidfile_name,
1057 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001058
1059
showardd3dc1992009-04-22 21:01:40 +00001060 def attach_to_existing_process(self, execution_tag,
1061 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001062 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001063 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1064 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001065 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def kill(self):
showard170873e2009-01-07 00:22:26 +00001069 if self.has_process():
1070 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001071
mbligh36768f02008-02-22 18:28:33 +00001072
showard170873e2009-01-07 00:22:26 +00001073 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001074 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001075 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001076
1077
showard170873e2009-01-07 00:22:26 +00001078 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001079 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001080 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001081 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001082
1083
showard170873e2009-01-07 00:22:26 +00001084 def _read_pidfile(self, use_second_read=False):
1085 assert self.pidfile_id is not None, (
1086 'You must call run() or attach_to_existing_process()')
1087 contents = _drone_manager.get_pidfile_contents(
1088 self.pidfile_id, use_second_read=use_second_read)
1089 if contents.is_invalid():
1090 self._state = drone_manager.PidfileContents()
1091 raise self._PidfileException(contents)
1092 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001093
1094
showard21baa452008-10-21 00:08:39 +00001095 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001096 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1097 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001098 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001099 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001100 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001101
1102
1103 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001104 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001105 return
mblighbb421852008-03-11 22:36:16 +00001106
showard21baa452008-10-21 00:08:39 +00001107 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001108
showard170873e2009-01-07 00:22:26 +00001109 if self._state.process is None:
1110 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001111 return
mbligh90a549d2008-03-25 23:52:34 +00001112
showard21baa452008-10-21 00:08:39 +00001113 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001114 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001115 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001116 return
mbligh90a549d2008-03-25 23:52:34 +00001117
showard170873e2009-01-07 00:22:26 +00001118 # pid but no running process - maybe process *just* exited
1119 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001120 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001121 # autoserv exited without writing an exit code
1122 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001123 self._handle_pidfile_error(
1124 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001125
showard21baa452008-10-21 00:08:39 +00001126
1127 def _get_pidfile_info(self):
1128 """\
1129 After completion, self._state will contain:
1130 pid=None, exit_status=None if autoserv has not yet run
1131 pid!=None, exit_status=None if autoserv is running
1132 pid!=None, exit_status!=None if autoserv has completed
1133 """
1134 try:
1135 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001136 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001137 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001141 """\
1142 Called when no pidfile is found or no pid is in the pidfile.
1143 """
showard170873e2009-01-07 00:22:26 +00001144 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001145 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001146 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1147 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001148 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001149 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001150
1151
showard35162b02009-03-03 02:17:30 +00001152 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001153 """\
1154 Called when autoserv has exited without writing an exit status,
1155 or we've timed out waiting for autoserv to write a pid to the
1156 pidfile. In either case, we just return failure and the caller
1157 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001158
showard170873e2009-01-07 00:22:26 +00001159 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001160 """
1161 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001162 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001163 self._state.exit_status = 1
1164 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001168 self._get_pidfile_info()
1169 return self._state.exit_status
1170
1171
1172 def num_tests_failed(self):
1173 self._get_pidfile_info()
1174 assert self._state.num_tests_failed is not None
1175 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001176
1177
mbligh36768f02008-02-22 18:28:33 +00001178class Agent(object):
showard170873e2009-01-07 00:22:26 +00001179 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001181 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001182 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001183 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001184
showard170873e2009-01-07 00:22:26 +00001185 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1186 for task in tasks)
1187 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1188
showardd3dc1992009-04-22 21:01:40 +00001189 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001190 for task in tasks:
1191 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001192
1193
showardd3dc1992009-04-22 21:01:40 +00001194 def _clear_queue(self):
1195 self.queue = Queue.Queue(0)
1196
1197
showard170873e2009-01-07 00:22:26 +00001198 def _union_ids(self, id_lists):
1199 return set(itertools.chain(*id_lists))
1200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def add_task(self, task):
1203 self.queue.put_nowait(task)
1204 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def tick(self):
showard21baa452008-10-21 00:08:39 +00001208 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001209 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001210 self.active_task.poll()
1211 if not self.active_task.is_done():
1212 return
1213 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001214
1215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001217 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001218 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001219 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001220 if not self.active_task.success:
1221 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001222 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001223
jadmanski0afbb632008-06-06 21:10:57 +00001224 if not self.is_done():
1225 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001229 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001230 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1231 # get reset.
1232 new_agent = Agent(self.active_task.failure_tasks)
1233 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001234
mblighe2586682008-02-29 22:45:46 +00001235
showard4c5374f2008-09-04 17:02:56 +00001236 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001237 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001241 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001242
1243
showardd3dc1992009-04-22 21:01:40 +00001244 def abort(self):
showard08a36412009-05-05 01:01:13 +00001245 # abort tasks until the queue is empty or a task ignores the abort
1246 while not self.is_done():
1247 if not self.active_task:
1248 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001249 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001250 if not self.active_task.aborted:
1251 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001252 return
1253 self.active_task = None
1254
showardd3dc1992009-04-22 21:01:40 +00001255
mbligh36768f02008-02-22 18:28:33 +00001256class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001257 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1258 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001259 self.done = False
1260 self.failure_tasks = failure_tasks
1261 self.started = False
1262 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001263 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001264 self.task = None
1265 self.agent = None
1266 self.monitor = None
1267 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001268 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001269 self.queue_entry_ids = []
1270 self.host_ids = []
1271 self.log_file = None
1272
1273
1274 def _set_ids(self, host=None, queue_entries=None):
1275 if queue_entries and queue_entries != [None]:
1276 self.host_ids = [entry.host.id for entry in queue_entries]
1277 self.queue_entry_ids = [entry.id for entry in queue_entries]
1278 else:
1279 assert host
1280 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def poll(self):
showard08a36412009-05-05 01:01:13 +00001284 if not self.started:
1285 self.start()
1286 self.tick()
1287
1288
1289 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001290 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001291 exit_code = self.monitor.exit_code()
1292 if exit_code is None:
1293 return
1294 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001295 else:
1296 success = False
mbligh36768f02008-02-22 18:28:33 +00001297
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001299
1300
jadmanski0afbb632008-06-06 21:10:57 +00001301 def is_done(self):
1302 return self.done
mbligh36768f02008-02-22 18:28:33 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001306 if self.done:
1307 return
jadmanski0afbb632008-06-06 21:10:57 +00001308 self.done = True
1309 self.success = success
1310 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001311
1312
jadmanski0afbb632008-06-06 21:10:57 +00001313 def prolog(self):
1314 pass
mblighd64e5702008-04-04 21:39:28 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001318 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001319
mbligh36768f02008-02-22 18:28:33 +00001320
jadmanski0afbb632008-06-06 21:10:57 +00001321 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001322 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001323 _drone_manager.copy_to_results_repository(
1324 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def epilog(self):
1328 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001329
1330
jadmanski0afbb632008-06-06 21:10:57 +00001331 def start(self):
1332 assert self.agent
1333
1334 if not self.started:
1335 self.prolog()
1336 self.run()
1337
1338 self.started = True
1339
1340
1341 def abort(self):
1342 if self.monitor:
1343 self.monitor.kill()
1344 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001345 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001346 self.cleanup()
1347
1348
showard170873e2009-01-07 00:22:26 +00001349 def set_host_log_file(self, base_name, host):
1350 filename = '%s.%s' % (time.time(), base_name)
1351 self.log_file = os.path.join('hosts', host.hostname, filename)
1352
1353
showardde634ee2009-01-30 01:44:24 +00001354 def _get_consistent_execution_tag(self, queue_entries):
1355 first_execution_tag = queue_entries[0].execution_tag()
1356 for queue_entry in queue_entries[1:]:
1357 assert queue_entry.execution_tag() == first_execution_tag, (
1358 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1359 queue_entry,
1360 first_execution_tag,
1361 queue_entries[0]))
1362 return first_execution_tag
1363
1364
showarda1e74b32009-05-12 17:32:04 +00001365 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001366 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001367 if use_monitor is None:
1368 assert self.monitor
1369 use_monitor = self.monitor
1370 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001371 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001372 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001373 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001374 results_path)
showardde634ee2009-01-30 01:44:24 +00001375
showarda1e74b32009-05-12 17:32:04 +00001376
1377 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001378 reparse_task = FinalReparseTask(queue_entries)
1379 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1380
1381
showarda1e74b32009-05-12 17:32:04 +00001382 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1383 self._copy_results(queue_entries, use_monitor)
1384 self._parse_results(queue_entries)
1385
1386
showardd3dc1992009-04-22 21:01:40 +00001387 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001388 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001389 self.monitor = PidfileRunMonitor()
1390 self.monitor.run(self.cmd, self._working_directory,
1391 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001392 log_file=self.log_file,
1393 pidfile_name=pidfile_name,
1394 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001395
1396
showardd9205182009-04-27 20:09:55 +00001397class TaskWithJobKeyvals(object):
1398 """AgentTask mixin providing functionality to help with job keyval files."""
1399 _KEYVAL_FILE = 'keyval'
1400 def _format_keyval(self, key, value):
1401 return '%s=%s' % (key, value)
1402
1403
1404 def _keyval_path(self):
1405 """Subclasses must override this"""
1406 raise NotImplemented
1407
1408
1409 def _write_keyval_after_job(self, field, value):
1410 assert self.monitor
1411 if not self.monitor.has_process():
1412 return
1413 _drone_manager.write_lines_to_file(
1414 self._keyval_path(), [self._format_keyval(field, value)],
1415 paired_with_process=self.monitor.get_process())
1416
1417
1418 def _job_queued_keyval(self, job):
1419 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1420
1421
1422 def _write_job_finished(self):
1423 self._write_keyval_after_job("job_finished", int(time.time()))
1424
1425
1426class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001427 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001428 """\
showard170873e2009-01-07 00:22:26 +00001429 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001430 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001431 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001432 # normalize the protection name
1433 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001434
jadmanski0afbb632008-06-06 21:10:57 +00001435 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001436 self.queue_entry_to_fail = queue_entry
1437 # *don't* include the queue entry in IDs -- if the queue entry is
1438 # aborted, we want to leave the repair task running
1439 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001440
1441 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001442 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1443 ['-R', '--host-protection', protection],
1444 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001445 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1446
showard170873e2009-01-07 00:22:26 +00001447 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001448
mbligh36768f02008-02-22 18:28:33 +00001449
jadmanski0afbb632008-06-06 21:10:57 +00001450 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001451 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001452 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001453 if self.queue_entry_to_fail:
1454 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001455
1456
showardd9205182009-04-27 20:09:55 +00001457 def _keyval_path(self):
1458 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1459
1460
showardde634ee2009-01-30 01:44:24 +00001461 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001462 assert self.queue_entry_to_fail
1463
1464 if self.queue_entry_to_fail.meta_host:
1465 return # don't fail metahost entries, they'll be reassigned
1466
1467 self.queue_entry_to_fail.update_from_database()
1468 if self.queue_entry_to_fail.status != 'Queued':
1469 return # entry has been aborted
1470
1471 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001472 queued_key, queued_time = self._job_queued_keyval(
1473 self.queue_entry_to_fail.job)
1474 self._write_keyval_after_job(queued_key, queued_time)
1475 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001476 # copy results logs into the normal place for job results
1477 _drone_manager.copy_results_on_drone(
1478 self.monitor.get_process(),
1479 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001480 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001481
showarda1e74b32009-05-12 17:32:04 +00001482 self._copy_results([self.queue_entry_to_fail])
1483 if self.queue_entry_to_fail.job.parse_failed_repair:
1484 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001485 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001486
1487
jadmanski0afbb632008-06-06 21:10:57 +00001488 def epilog(self):
1489 super(RepairTask, self).epilog()
1490 if self.success:
1491 self.host.set_status('Ready')
1492 else:
1493 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001494 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001495 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001496
1497
showard8fe93b52008-11-18 17:53:22 +00001498class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001499 def epilog(self):
1500 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001501 should_copy_results = (self.queue_entry and not self.success
1502 and not self.queue_entry.meta_host)
1503 if should_copy_results:
1504 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001505 destination = os.path.join(self.queue_entry.execution_tag(),
1506 os.path.basename(self.log_file))
1507 _drone_manager.copy_to_results_repository(
1508 self.monitor.get_process(), self.log_file,
1509 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001510
1511
1512class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001513 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001514 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001515 self.host = host or queue_entry.host
1516 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001517
jadmanski0afbb632008-06-06 21:10:57 +00001518 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001519 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1520 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001521 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001522 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1523 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001524
showard170873e2009-01-07 00:22:26 +00001525 self.set_host_log_file('verify', self.host)
1526 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001530 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001531 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001532 if self.queue_entry:
1533 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001534 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def epilog(self):
1538 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001539
jadmanski0afbb632008-06-06 21:10:57 +00001540 if self.success:
1541 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001542
1543
showardd9205182009-04-27 20:09:55 +00001544class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001545 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001546 self.job = job
1547 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001548 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001549 super(QueueTask, self).__init__(cmd, self._execution_tag())
1550 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001551
1552
showard73ec0442009-02-07 02:05:20 +00001553 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001554 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001555
1556
1557 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1558 keyval_contents = '\n'.join(self._format_keyval(key, value)
1559 for key, value in keyval_dict.iteritems())
1560 # always end with a newline to allow additional keyvals to be written
1561 keyval_contents += '\n'
1562 _drone_manager.attach_file_to_execution(self._execution_tag(),
1563 keyval_contents,
1564 file_path=keyval_path)
1565
1566
1567 def _write_keyvals_before_job(self, keyval_dict):
1568 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1569
1570
showard170873e2009-01-07 00:22:26 +00001571 def _write_host_keyvals(self, host):
1572 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1573 host.hostname)
1574 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001575 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1576 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001577
1578
showard170873e2009-01-07 00:22:26 +00001579 def _execution_tag(self):
1580 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001581
1582
jadmanski0afbb632008-06-06 21:10:57 +00001583 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001584 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001585 keyval_dict = {queued_key: queued_time}
1586 if self.group_name:
1587 keyval_dict['host_group_name'] = self.group_name
1588 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001589 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001590 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001591 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001592 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001593 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001594 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001595 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001596 assert len(self.queue_entries) == 1
1597 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001598
1599
showard35162b02009-03-03 02:17:30 +00001600 def _write_lost_process_error_file(self):
1601 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1602 _drone_manager.write_lines_to_file(error_file_path,
1603 [_LOST_PROCESS_ERROR])
1604
1605
showardd3dc1992009-04-22 21:01:40 +00001606 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001607 if not self.monitor:
1608 return
1609
showardd9205182009-04-27 20:09:55 +00001610 self._write_job_finished()
1611
showardd3dc1992009-04-22 21:01:40 +00001612 # both of these conditionals can be true, iff the process ran, wrote a
1613 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001614 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001615 gather_task = GatherLogsTask(self.job, self.queue_entries)
1616 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001617
1618 if self.monitor.lost_process:
1619 self._write_lost_process_error_file()
1620 for queue_entry in self.queue_entries:
1621 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001622
1623
showardcbd74612008-11-19 21:42:02 +00001624 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001625 _drone_manager.write_lines_to_file(
1626 os.path.join(self._execution_tag(), 'status.log'),
1627 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001628 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001629
1630
jadmanskif7fa2cc2008-10-01 14:13:23 +00001631 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001632 if not self.monitor or not self.monitor.has_process():
1633 return
1634
jadmanskif7fa2cc2008-10-01 14:13:23 +00001635 # build up sets of all the aborted_by and aborted_on values
1636 aborted_by, aborted_on = set(), set()
1637 for queue_entry in self.queue_entries:
1638 if queue_entry.aborted_by:
1639 aborted_by.add(queue_entry.aborted_by)
1640 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1641 aborted_on.add(t)
1642
1643 # extract some actual, unique aborted by value and write it out
1644 assert len(aborted_by) <= 1
1645 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001646 aborted_by_value = aborted_by.pop()
1647 aborted_on_value = max(aborted_on)
1648 else:
1649 aborted_by_value = 'autotest_system'
1650 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001651
showarda0382352009-02-11 23:36:43 +00001652 self._write_keyval_after_job("aborted_by", aborted_by_value)
1653 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001654
showardcbd74612008-11-19 21:42:02 +00001655 aborted_on_string = str(datetime.datetime.fromtimestamp(
1656 aborted_on_value))
1657 self._write_status_comment('Job aborted by %s on %s' %
1658 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001659
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 def abort(self):
1662 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001663 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001664 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001665
1666
jadmanski0afbb632008-06-06 21:10:57 +00001667 def epilog(self):
1668 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001669 self._finish_task()
1670 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001671
1672
mblighbb421852008-03-11 22:36:16 +00001673class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001674 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001675 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001676 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001677
1678
jadmanski0afbb632008-06-06 21:10:57 +00001679 def run(self):
1680 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001681
1682
jadmanski0afbb632008-06-06 21:10:57 +00001683 def prolog(self):
1684 # recovering an existing process - don't do prolog
1685 pass
mblighbb421852008-03-11 22:36:16 +00001686
1687
showardd3dc1992009-04-22 21:01:40 +00001688class PostJobTask(AgentTask):
1689 def __init__(self, queue_entries, pidfile_name, logfile_name,
1690 run_monitor=None):
1691 """
1692 If run_monitor != None, we're recovering a running task.
1693 """
1694 self._queue_entries = queue_entries
1695 self._pidfile_name = pidfile_name
1696 self._run_monitor = run_monitor
1697
1698 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1699 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1700 self._autoserv_monitor = PidfileRunMonitor()
1701 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1702 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1703
1704 if _testing_mode:
1705 command = 'true'
1706 else:
1707 command = self._generate_command(self._results_dir)
1708
1709 super(PostJobTask, self).__init__(cmd=command,
1710 working_directory=self._execution_tag)
1711
1712 self.log_file = os.path.join(self._execution_tag, logfile_name)
1713 self._final_status = self._determine_final_status()
1714
1715
1716 def _generate_command(self, results_dir):
1717 raise NotImplementedError('Subclasses must override this')
1718
1719
1720 def _job_was_aborted(self):
1721 was_aborted = None
1722 for queue_entry in self._queue_entries:
1723 queue_entry.update_from_database()
1724 if was_aborted is None: # first queue entry
1725 was_aborted = bool(queue_entry.aborted)
1726 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1727 email_manager.manager.enqueue_notify_email(
1728 'Inconsistent abort state',
1729 'Queue entries have inconsistent abort state: ' +
1730 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1731 # don't crash here, just assume true
1732 return True
1733 return was_aborted
1734
1735
1736 def _determine_final_status(self):
1737 if self._job_was_aborted():
1738 return models.HostQueueEntry.Status.ABORTED
1739
1740 # we'll use a PidfileRunMonitor to read the autoserv exit status
1741 if self._autoserv_monitor.exit_code() == 0:
1742 return models.HostQueueEntry.Status.COMPLETED
1743 return models.HostQueueEntry.Status.FAILED
1744
1745
1746 def run(self):
1747 if self._run_monitor is not None:
1748 self.monitor = self._run_monitor
1749 else:
1750 # make sure we actually have results to work with.
1751 # this should never happen in normal operation.
1752 if not self._autoserv_monitor.has_process():
1753 email_manager.manager.enqueue_notify_email(
1754 'No results in post-job task',
1755 'No results in post-job task at %s' %
1756 self._autoserv_monitor.pidfile_id)
1757 self.finished(False)
1758 return
1759
1760 super(PostJobTask, self).run(
1761 pidfile_name=self._pidfile_name,
1762 paired_with_pidfile=self._paired_with_pidfile)
1763
1764
1765 def _set_all_statuses(self, status):
1766 for queue_entry in self._queue_entries:
1767 queue_entry.set_status(status)
1768
1769
1770 def abort(self):
1771 # override AgentTask.abort() to avoid killing the process and ending
1772 # the task. post-job tasks continue when the job is aborted.
1773 pass
1774
1775
1776class GatherLogsTask(PostJobTask):
1777 """
1778 Task responsible for
1779 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1780 * copying logs to the results repository
1781 * spawning CleanupTasks for hosts, if necessary
1782 * spawning a FinalReparseTask for the job
1783 """
1784 def __init__(self, job, queue_entries, run_monitor=None):
1785 self._job = job
1786 super(GatherLogsTask, self).__init__(
1787 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1788 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1789 self._set_ids(queue_entries=queue_entries)
1790
1791
1792 def _generate_command(self, results_dir):
1793 host_list = ','.join(queue_entry.host.hostname
1794 for queue_entry in self._queue_entries)
1795 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1796 '-r', results_dir]
1797
1798
1799 def prolog(self):
1800 super(GatherLogsTask, self).prolog()
1801 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1802
1803
1804 def _reboot_hosts(self):
1805 reboot_after = self._job.reboot_after
1806 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001807 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1808 do_reboot = True
1809 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001810 do_reboot = True
1811 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1812 final_success = (
1813 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1814 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1815 do_reboot = (final_success and num_tests_failed == 0)
1816
1817 for queue_entry in self._queue_entries:
1818 if do_reboot:
1819 # don't pass the queue entry to the CleanupTask. if the cleanup
1820 # fails, the job doesn't care -- it's over.
1821 cleanup_task = CleanupTask(host=queue_entry.host)
1822 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1823 else:
1824 queue_entry.host.set_status('Ready')
1825
1826
1827 def epilog(self):
1828 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001829 if self._autoserv_monitor.has_process():
1830 self._copy_and_parse_results(self._queue_entries,
1831 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001832 self._reboot_hosts()
1833
1834
showard0bbfc212009-04-29 21:06:13 +00001835 def run(self):
showard597bfd32009-05-08 18:22:50 +00001836 autoserv_exit_code = self._autoserv_monitor.exit_code()
1837 # only run if Autoserv exited due to some signal. if we have no exit
1838 # code, assume something bad (and signal-like) happened.
1839 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001840 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001841 else:
1842 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001843
1844
showard8fe93b52008-11-18 17:53:22 +00001845class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001846 def __init__(self, host=None, queue_entry=None):
1847 assert bool(host) ^ bool(queue_entry)
1848 if queue_entry:
1849 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001850 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001851 self.host = host
showard170873e2009-01-07 00:22:26 +00001852
1853 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001854 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1855 ['--cleanup'],
1856 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001857 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001858 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1859 failure_tasks=[repair_task])
1860
1861 self._set_ids(host=host, queue_entries=[queue_entry])
1862 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001863
mblighd5c95802008-03-05 00:33:46 +00001864
jadmanski0afbb632008-06-06 21:10:57 +00001865 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001866 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001867 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001868 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001869
mblighd5c95802008-03-05 00:33:46 +00001870
showard21baa452008-10-21 00:08:39 +00001871 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001872 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001873 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001874 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001875 self.host.update_field('dirty', 0)
1876
1877
showardd3dc1992009-04-22 21:01:40 +00001878class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001879 _num_running_parses = 0
1880
showardd3dc1992009-04-22 21:01:40 +00001881 def __init__(self, queue_entries, run_monitor=None):
1882 super(FinalReparseTask, self).__init__(queue_entries,
1883 pidfile_name=_PARSER_PID_FILE,
1884 logfile_name='.parse.log',
1885 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001886 # don't use _set_ids, since we don't want to set the host_ids
1887 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001888 self._parse_started = False
1889
showard97aed502008-11-04 02:01:24 +00001890
1891 @classmethod
1892 def _increment_running_parses(cls):
1893 cls._num_running_parses += 1
1894
1895
1896 @classmethod
1897 def _decrement_running_parses(cls):
1898 cls._num_running_parses -= 1
1899
1900
1901 @classmethod
1902 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001903 return (cls._num_running_parses <
1904 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001905
1906
1907 def prolog(self):
1908 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001909 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001910
1911
1912 def epilog(self):
1913 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001914 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001915
1916
showardd3dc1992009-04-22 21:01:40 +00001917 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001918 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001919 results_dir]
showard97aed502008-11-04 02:01:24 +00001920
1921
showard08a36412009-05-05 01:01:13 +00001922 def tick(self):
1923 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001924 # and we can, at which point we revert to default behavior
1925 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001926 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001927 else:
1928 self._try_starting_parse()
1929
1930
1931 def run(self):
1932 # override run() to not actually run unless we can
1933 self._try_starting_parse()
1934
1935
1936 def _try_starting_parse(self):
1937 if not self._can_run_new_parse():
1938 return
showard170873e2009-01-07 00:22:26 +00001939
showard97aed502008-11-04 02:01:24 +00001940 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001941 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001942
showard97aed502008-11-04 02:01:24 +00001943 self._increment_running_parses()
1944 self._parse_started = True
1945
1946
1947 def finished(self, success):
1948 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001949 if self._parse_started:
1950 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001951
1952
showardc9ae1782009-01-30 01:42:37 +00001953class SetEntryPendingTask(AgentTask):
1954 def __init__(self, queue_entry):
1955 super(SetEntryPendingTask, self).__init__(cmd='')
1956 self._queue_entry = queue_entry
1957 self._set_ids(queue_entries=[queue_entry])
1958
1959
1960 def run(self):
1961 agent = self._queue_entry.on_pending()
1962 if agent:
1963 self.agent.dispatcher.add_agent(agent)
1964 self.finished(True)
1965
1966
showarda3c58572009-03-12 20:36:59 +00001967class DBError(Exception):
1968 """Raised by the DBObject constructor when its select fails."""
1969
1970
mbligh36768f02008-02-22 18:28:33 +00001971class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001972 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001973
1974 # Subclasses MUST override these:
1975 _table_name = ''
1976 _fields = ()
1977
showarda3c58572009-03-12 20:36:59 +00001978 # A mapping from (type, id) to the instance of the object for that
1979 # particular id. This prevents us from creating new Job() and Host()
1980 # instances for every HostQueueEntry object that we instantiate as
1981 # multiple HQEs often share the same Job.
1982 _instances_by_type_and_id = weakref.WeakValueDictionary()
1983 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001984
showarda3c58572009-03-12 20:36:59 +00001985
1986 def __new__(cls, id=None, **kwargs):
1987 """
1988 Look to see if we already have an instance for this particular type
1989 and id. If so, use it instead of creating a duplicate instance.
1990 """
1991 if id is not None:
1992 instance = cls._instances_by_type_and_id.get((cls, id))
1993 if instance:
1994 return instance
1995 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1996
1997
1998 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001999 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002000 assert self._table_name, '_table_name must be defined in your class'
2001 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002002 if not new_record:
2003 if self._initialized and not always_query:
2004 return # We've already been initialized.
2005 if id is None:
2006 id = row[0]
2007 # Tell future constructors to use us instead of re-querying while
2008 # this instance is still around.
2009 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002010
showard6ae5ea92009-02-25 00:11:51 +00002011 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002012
jadmanski0afbb632008-06-06 21:10:57 +00002013 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002014
jadmanski0afbb632008-06-06 21:10:57 +00002015 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002016 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002017
showarda3c58572009-03-12 20:36:59 +00002018 if self._initialized:
2019 differences = self._compare_fields_in_row(row)
2020 if differences:
showard7629f142009-03-27 21:02:02 +00002021 logging.warn(
2022 'initialized %s %s instance requery is updating: %s',
2023 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002024 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002025 self._initialized = True
2026
2027
2028 @classmethod
2029 def _clear_instance_cache(cls):
2030 """Used for testing, clear the internal instance cache."""
2031 cls._instances_by_type_and_id.clear()
2032
2033
showardccbd6c52009-03-21 00:10:21 +00002034 def _fetch_row_from_db(self, row_id):
2035 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2036 rows = _db.execute(sql, (row_id,))
2037 if not rows:
showard76e29d12009-04-15 21:53:10 +00002038 raise DBError("row not found (table=%s, row id=%s)"
2039 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002040 return rows[0]
2041
2042
showarda3c58572009-03-12 20:36:59 +00002043 def _assert_row_length(self, row):
2044 assert len(row) == len(self._fields), (
2045 "table = %s, row = %s/%d, fields = %s/%d" % (
2046 self.__table, row, len(row), self._fields, len(self._fields)))
2047
2048
2049 def _compare_fields_in_row(self, row):
2050 """
2051 Given a row as returned by a SELECT query, compare it to our existing
2052 in memory fields.
2053
2054 @param row - A sequence of values corresponding to fields named in
2055 The class attribute _fields.
2056
2057 @returns A dictionary listing the differences keyed by field name
2058 containing tuples of (current_value, row_value).
2059 """
2060 self._assert_row_length(row)
2061 differences = {}
2062 for field, row_value in itertools.izip(self._fields, row):
2063 current_value = getattr(self, field)
2064 if current_value != row_value:
2065 differences[field] = (current_value, row_value)
2066 return differences
showard2bab8f42008-11-12 18:15:22 +00002067
2068
2069 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002070 """
2071 Update our field attributes using a single row returned by SELECT.
2072
2073 @param row - A sequence of values corresponding to fields named in
2074 the class fields list.
2075 """
2076 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002077
showard2bab8f42008-11-12 18:15:22 +00002078 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002079 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002080 setattr(self, field, value)
2081 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002082
showard2bab8f42008-11-12 18:15:22 +00002083 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002084
mblighe2586682008-02-29 22:45:46 +00002085
showardccbd6c52009-03-21 00:10:21 +00002086 def update_from_database(self):
2087 assert self.id is not None
2088 row = self._fetch_row_from_db(self.id)
2089 self._update_fields_from_row(row)
2090
2091
jadmanski0afbb632008-06-06 21:10:57 +00002092 def count(self, where, table = None):
2093 if not table:
2094 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002095
jadmanski0afbb632008-06-06 21:10:57 +00002096 rows = _db.execute("""
2097 SELECT count(*) FROM %s
2098 WHERE %s
2099 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002100
jadmanski0afbb632008-06-06 21:10:57 +00002101 assert len(rows) == 1
2102
2103 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002104
2105
showardd3dc1992009-04-22 21:01:40 +00002106 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002107 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002108
showard2bab8f42008-11-12 18:15:22 +00002109 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002110 return
mbligh36768f02008-02-22 18:28:33 +00002111
mblighf8c624d2008-07-03 16:58:45 +00002112 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002113 _db.execute(query, (value, self.id))
2114
showard2bab8f42008-11-12 18:15:22 +00002115 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002116
2117
jadmanski0afbb632008-06-06 21:10:57 +00002118 def save(self):
2119 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002120 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002121 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002122 values = []
2123 for key in keys:
2124 value = getattr(self, key)
2125 if value is None:
2126 values.append('NULL')
2127 else:
2128 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002129 values_str = ','.join(values)
2130 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2131 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002132 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002133 # Update our id to the one the database just assigned to us.
2134 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002135
2136
jadmanski0afbb632008-06-06 21:10:57 +00002137 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002138 self._instances_by_type_and_id.pop((type(self), id), None)
2139 self._initialized = False
2140 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002141 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2142 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002143
2144
showard63a34772008-08-18 19:32:50 +00002145 @staticmethod
2146 def _prefix_with(string, prefix):
2147 if string:
2148 string = prefix + string
2149 return string
2150
2151
jadmanski0afbb632008-06-06 21:10:57 +00002152 @classmethod
showard989f25d2008-10-01 11:38:11 +00002153 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002154 """
2155 Construct instances of our class based on the given database query.
2156
2157 @yields One class instance for each row fetched.
2158 """
showard63a34772008-08-18 19:32:50 +00002159 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2160 where = cls._prefix_with(where, 'WHERE ')
2161 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002162 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002163 'joins' : joins,
2164 'where' : where,
2165 'order_by' : order_by})
2166 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002167 for row in rows:
2168 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002169
mbligh36768f02008-02-22 18:28:33 +00002170
2171class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002172 _table_name = 'ineligible_host_queues'
2173 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002174
2175
showard89f84db2009-03-12 20:39:13 +00002176class AtomicGroup(DBObject):
2177 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002178 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2179 'invalid')
showard89f84db2009-03-12 20:39:13 +00002180
2181
showard989f25d2008-10-01 11:38:11 +00002182class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002183 _table_name = 'labels'
2184 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002185 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002186
2187
mbligh36768f02008-02-22 18:28:33 +00002188class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002189 _table_name = 'hosts'
2190 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2191 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2192
2193
jadmanski0afbb632008-06-06 21:10:57 +00002194 def current_task(self):
2195 rows = _db.execute("""
2196 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2197 """, (self.id,))
2198
2199 if len(rows) == 0:
2200 return None
2201 else:
2202 assert len(rows) == 1
2203 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002204 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002205
2206
jadmanski0afbb632008-06-06 21:10:57 +00002207 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002208 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002209 if self.current_task():
2210 self.current_task().requeue()
2211
showard6ae5ea92009-02-25 00:11:51 +00002212
jadmanski0afbb632008-06-06 21:10:57 +00002213 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002214 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002215 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002216
2217
showard170873e2009-01-07 00:22:26 +00002218 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002219 """
showard170873e2009-01-07 00:22:26 +00002220 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002221 """
2222 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002223 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002224 FROM labels
2225 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002226 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002227 ORDER BY labels.name
2228 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002229 platform = None
2230 all_labels = []
2231 for label_name, is_platform in rows:
2232 if is_platform:
2233 platform = label_name
2234 all_labels.append(label_name)
2235 return platform, all_labels
2236
2237
2238 def reverify_tasks(self):
2239 cleanup_task = CleanupTask(host=self)
2240 verify_task = VerifyTask(host=self)
2241 # just to make sure this host does not get taken away
2242 self.set_status('Cleaning')
2243 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002244
2245
showard54c1ea92009-05-20 00:32:58 +00002246 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2247
2248
2249 @classmethod
2250 def cmp_for_sort(cls, a, b):
2251 """
2252 A comparison function for sorting Host objects by hostname.
2253
2254 This strips any trailing numeric digits, ignores leading 0s and
2255 compares hostnames by the leading name and the trailing digits as a
2256 number. If both hostnames do not match this pattern, they are simply
2257 compared as lower case strings.
2258
2259 Example of how hostnames will be sorted:
2260
2261 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2262
2263 This hopefully satisfy most people's hostname sorting needs regardless
2264 of their exact naming schemes. Nobody sane should have both a host10
2265 and host010 (but the algorithm works regardless).
2266 """
2267 lower_a = a.hostname.lower()
2268 lower_b = b.hostname.lower()
2269 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2270 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2271 if match_a and match_b:
2272 name_a, number_a_str = match_a.groups()
2273 name_b, number_b_str = match_b.groups()
2274 number_a = int(number_a_str.lstrip('0'))
2275 number_b = int(number_b_str.lstrip('0'))
2276 result = cmp((name_a, number_a), (name_b, number_b))
2277 if result == 0 and lower_a != lower_b:
2278 # If they compared equal above but the lower case names are
2279 # indeed different, don't report equality. abc012 != abc12.
2280 return cmp(lower_a, lower_b)
2281 return result
2282 else:
2283 return cmp(lower_a, lower_b)
2284
2285
mbligh36768f02008-02-22 18:28:33 +00002286class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002287 _table_name = 'host_queue_entries'
2288 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002289 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002290 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002291
2292
showarda3c58572009-03-12 20:36:59 +00002293 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002294 assert id or row
showarda3c58572009-03-12 20:36:59 +00002295 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002296 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002297
jadmanski0afbb632008-06-06 21:10:57 +00002298 if self.host_id:
2299 self.host = Host(self.host_id)
2300 else:
2301 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002302
showard170873e2009-01-07 00:22:26 +00002303 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002304 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002305
2306
showard89f84db2009-03-12 20:39:13 +00002307 @classmethod
2308 def clone(cls, template):
2309 """
2310 Creates a new row using the values from a template instance.
2311
2312 The new instance will not exist in the database or have a valid
2313 id attribute until its save() method is called.
2314 """
2315 assert isinstance(template, cls)
2316 new_row = [getattr(template, field) for field in cls._fields]
2317 clone = cls(row=new_row, new_record=True)
2318 clone.id = None
2319 return clone
2320
2321
showardc85c21b2008-11-24 22:17:37 +00002322 def _view_job_url(self):
2323 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2324
2325
showardf1ae3542009-05-11 19:26:02 +00002326 def get_labels(self):
2327 """
2328 Get all labels associated with this host queue entry (either via the
2329 meta_host or as a job dependency label). The labels yielded are not
2330 guaranteed to be unique.
2331
2332 @yields Label instances associated with this host_queue_entry.
2333 """
2334 if self.meta_host:
2335 yield Label(id=self.meta_host, always_query=False)
2336 labels = Label.fetch(
2337 joins="JOIN jobs_dependency_labels AS deps "
2338 "ON (labels.id = deps.label_id)",
2339 where="deps.job_id = %d" % self.job.id)
2340 for label in labels:
2341 yield label
2342
2343
jadmanski0afbb632008-06-06 21:10:57 +00002344 def set_host(self, host):
2345 if host:
2346 self.queue_log_record('Assigning host ' + host.hostname)
2347 self.update_field('host_id', host.id)
2348 self.update_field('active', True)
2349 self.block_host(host.id)
2350 else:
2351 self.queue_log_record('Releasing host')
2352 self.unblock_host(self.host.id)
2353 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002354
jadmanski0afbb632008-06-06 21:10:57 +00002355 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002356
2357
jadmanski0afbb632008-06-06 21:10:57 +00002358 def get_host(self):
2359 return self.host
mbligh36768f02008-02-22 18:28:33 +00002360
2361
jadmanski0afbb632008-06-06 21:10:57 +00002362 def queue_log_record(self, log_line):
2363 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002364 _drone_manager.write_lines_to_file(self.queue_log_path,
2365 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002366
2367
jadmanski0afbb632008-06-06 21:10:57 +00002368 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002369 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002370 row = [0, self.job.id, host_id]
2371 block = IneligibleHostQueue(row=row, new_record=True)
2372 block.save()
mblighe2586682008-02-29 22:45:46 +00002373
2374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002376 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002377 blocks = IneligibleHostQueue.fetch(
2378 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2379 for block in blocks:
2380 block.delete()
mblighe2586682008-02-29 22:45:46 +00002381
2382
showard2bab8f42008-11-12 18:15:22 +00002383 def set_execution_subdir(self, subdir=None):
2384 if subdir is None:
2385 assert self.get_host()
2386 subdir = self.get_host().hostname
2387 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002388
2389
showard6355f6b2008-12-05 18:52:13 +00002390 def _get_hostname(self):
2391 if self.host:
2392 return self.host.hostname
2393 return 'no host'
2394
2395
showard170873e2009-01-07 00:22:26 +00002396 def __str__(self):
2397 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2398
2399
jadmanski0afbb632008-06-06 21:10:57 +00002400 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002401 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002402
showardb18134f2009-03-20 20:52:18 +00002403 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002404
showardc85c21b2008-11-24 22:17:37 +00002405 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002406 self.update_field('complete', False)
2407 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002408
jadmanski0afbb632008-06-06 21:10:57 +00002409 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002410 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002411 self.update_field('complete', False)
2412 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002413
showardc85c21b2008-11-24 22:17:37 +00002414 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002415 self.update_field('complete', True)
2416 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002417
2418 should_email_status = (status.lower() in _notify_email_statuses or
2419 'all' in _notify_email_statuses)
2420 if should_email_status:
2421 self._email_on_status(status)
2422
2423 self._email_on_job_complete()
2424
2425
2426 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002427 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002428
2429 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2430 self.job.id, self.job.name, hostname, status)
2431 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2432 self.job.id, self.job.name, hostname, status,
2433 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002434 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002435
2436
2437 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002438 if not self.job.is_finished():
2439 return
showard542e8402008-09-19 20:16:18 +00002440
showardc85c21b2008-11-24 22:17:37 +00002441 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002442 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002443 for queue_entry in hosts_queue:
2444 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002445 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002446 queue_entry.status))
2447
2448 summary_text = "\n".join(summary_text)
2449 status_counts = models.Job.objects.get_status_counts(
2450 [self.job.id])[self.job.id]
2451 status = ', '.join('%d %s' % (count, status) for status, count
2452 in status_counts.iteritems())
2453
2454 subject = 'Autotest: Job ID: %s "%s" %s' % (
2455 self.job.id, self.job.name, status)
2456 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2457 self.job.id, self.job.name, status, self._view_job_url(),
2458 summary_text)
showard170873e2009-01-07 00:22:26 +00002459 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002460
2461
showard89f84db2009-03-12 20:39:13 +00002462 def run(self, assigned_host=None):
2463 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002464 assert assigned_host
2465 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002466 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002467
showardb18134f2009-03-20 20:52:18 +00002468 logging.info("%s/%s/%s scheduled on %s, status=%s",
2469 self.job.name, self.meta_host, self.atomic_group_id,
2470 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002471
jadmanski0afbb632008-06-06 21:10:57 +00002472 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002473
showard6ae5ea92009-02-25 00:11:51 +00002474
jadmanski0afbb632008-06-06 21:10:57 +00002475 def requeue(self):
2476 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002477 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002478 # verify/cleanup failure sets the execution subdir, so reset it here
2479 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002480 if self.meta_host:
2481 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002482
2483
jadmanski0afbb632008-06-06 21:10:57 +00002484 def handle_host_failure(self):
2485 """\
2486 Called when this queue entry's host has failed verification and
2487 repair.
2488 """
2489 assert not self.meta_host
2490 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002491 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002492
2493
jadmanskif7fa2cc2008-10-01 14:13:23 +00002494 @property
2495 def aborted_by(self):
2496 self._load_abort_info()
2497 return self._aborted_by
2498
2499
2500 @property
2501 def aborted_on(self):
2502 self._load_abort_info()
2503 return self._aborted_on
2504
2505
2506 def _load_abort_info(self):
2507 """ Fetch info about who aborted the job. """
2508 if hasattr(self, "_aborted_by"):
2509 return
2510 rows = _db.execute("""
2511 SELECT users.login, aborted_host_queue_entries.aborted_on
2512 FROM aborted_host_queue_entries
2513 INNER JOIN users
2514 ON users.id = aborted_host_queue_entries.aborted_by_id
2515 WHERE aborted_host_queue_entries.queue_entry_id = %s
2516 """, (self.id,))
2517 if rows:
2518 self._aborted_by, self._aborted_on = rows[0]
2519 else:
2520 self._aborted_by = self._aborted_on = None
2521
2522
showardb2e2c322008-10-14 17:33:55 +00002523 def on_pending(self):
2524 """
2525 Called when an entry in a synchronous job has passed verify. If the
2526 job is ready to run, returns an agent to run the job. Returns None
2527 otherwise.
2528 """
2529 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002530 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002531 if self.job.is_ready():
2532 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002533 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002534 return None
2535
2536
showardd3dc1992009-04-22 21:01:40 +00002537 def abort(self, dispatcher):
2538 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002539
showardd3dc1992009-04-22 21:01:40 +00002540 Status = models.HostQueueEntry.Status
2541 has_running_job_agent = (
2542 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2543 and dispatcher.get_agents_for_entry(self))
2544 if has_running_job_agent:
2545 # do nothing; post-job tasks will finish and then mark this entry
2546 # with status "Aborted" and take care of the host
2547 return
2548
2549 if self.status in (Status.STARTING, Status.PENDING):
2550 self.host.set_status(models.Host.Status.READY)
2551 elif self.status == Status.VERIFYING:
2552 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2553
2554 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002555
2556 def execution_tag(self):
2557 assert self.execution_subdir
2558 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002559
2560
mbligh36768f02008-02-22 18:28:33 +00002561class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002562 _table_name = 'jobs'
2563 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2564 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002565 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002566 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002567
2568
showarda3c58572009-03-12 20:36:59 +00002569 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002570 assert id or row
showarda3c58572009-03-12 20:36:59 +00002571 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002572
mblighe2586682008-02-29 22:45:46 +00002573
jadmanski0afbb632008-06-06 21:10:57 +00002574 def is_server_job(self):
2575 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002576
2577
showard170873e2009-01-07 00:22:26 +00002578 def tag(self):
2579 return "%s-%s" % (self.id, self.owner)
2580
2581
jadmanski0afbb632008-06-06 21:10:57 +00002582 def get_host_queue_entries(self):
2583 rows = _db.execute("""
2584 SELECT * FROM host_queue_entries
2585 WHERE job_id= %s
2586 """, (self.id,))
2587 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002588
jadmanski0afbb632008-06-06 21:10:57 +00002589 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002590
jadmanski0afbb632008-06-06 21:10:57 +00002591 return entries
mbligh36768f02008-02-22 18:28:33 +00002592
2593
jadmanski0afbb632008-06-06 21:10:57 +00002594 def set_status(self, status, update_queues=False):
2595 self.update_field('status',status)
2596
2597 if update_queues:
2598 for queue_entry in self.get_host_queue_entries():
2599 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002600
2601
jadmanski0afbb632008-06-06 21:10:57 +00002602 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002603 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2604 status='Pending')
2605 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def num_machines(self, clause = None):
2609 sql = "job_id=%s" % self.id
2610 if clause:
2611 sql += " AND (%s)" % clause
2612 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002613
2614
jadmanski0afbb632008-06-06 21:10:57 +00002615 def num_queued(self):
2616 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002617
2618
jadmanski0afbb632008-06-06 21:10:57 +00002619 def num_active(self):
2620 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002621
2622
jadmanski0afbb632008-06-06 21:10:57 +00002623 def num_complete(self):
2624 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002625
2626
jadmanski0afbb632008-06-06 21:10:57 +00002627 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002628 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002629
mbligh36768f02008-02-22 18:28:33 +00002630
showard6bb7c292009-01-30 01:44:51 +00002631 def _not_yet_run_entries(self, include_verifying=True):
2632 statuses = [models.HostQueueEntry.Status.QUEUED,
2633 models.HostQueueEntry.Status.PENDING]
2634 if include_verifying:
2635 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2636 return models.HostQueueEntry.objects.filter(job=self.id,
2637 status__in=statuses)
2638
2639
2640 def _stop_all_entries(self):
2641 entries_to_stop = self._not_yet_run_entries(
2642 include_verifying=False)
2643 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002644 assert not child_entry.complete, (
2645 '%s status=%s, active=%s, complete=%s' %
2646 (child_entry.id, child_entry.status, child_entry.active,
2647 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002648 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2649 child_entry.host.status = models.Host.Status.READY
2650 child_entry.host.save()
2651 child_entry.status = models.HostQueueEntry.Status.STOPPED
2652 child_entry.save()
2653
showard2bab8f42008-11-12 18:15:22 +00002654 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002655 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002656 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002657 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002658
2659
jadmanski0afbb632008-06-06 21:10:57 +00002660 def write_to_machines_file(self, queue_entry):
2661 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002662 file_path = os.path.join(self.tag(), '.machines')
2663 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002664
2665
showardf1ae3542009-05-11 19:26:02 +00002666 def _next_group_name(self, group_name=''):
2667 """@returns a directory name to use for the next host group results."""
2668 if group_name:
2669 # Sanitize for use as a pathname.
2670 group_name = group_name.replace(os.path.sep, '_')
2671 if group_name.startswith('.'):
2672 group_name = '_' + group_name[1:]
2673 # Add a separator between the group name and 'group%d'.
2674 group_name += '.'
2675 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002676 query = models.HostQueueEntry.objects.filter(
2677 job=self.id).values('execution_subdir').distinct()
2678 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002679 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2680 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002681 if ids:
2682 next_id = max(ids) + 1
2683 else:
2684 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002685 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002686
2687
showard170873e2009-01-07 00:22:26 +00002688 def _write_control_file(self, execution_tag):
2689 control_path = _drone_manager.attach_file_to_execution(
2690 execution_tag, self.control_file)
2691 return control_path
mbligh36768f02008-02-22 18:28:33 +00002692
showardb2e2c322008-10-14 17:33:55 +00002693
showard2bab8f42008-11-12 18:15:22 +00002694 def get_group_entries(self, queue_entry_from_group):
2695 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002696 return list(HostQueueEntry.fetch(
2697 where='job_id=%s AND execution_subdir=%s',
2698 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002699
2700
showardb2e2c322008-10-14 17:33:55 +00002701 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002702 assert queue_entries
2703 execution_tag = queue_entries[0].execution_tag()
2704 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002705 hostnames = ','.join([entry.get_host().hostname
2706 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002707
showard87ba02a2009-04-20 19:37:32 +00002708 params = _autoserv_command_line(
2709 hostnames, execution_tag,
2710 ['-P', execution_tag, '-n',
2711 _drone_manager.absolute_path(control_path)],
2712 job=self)
mbligh36768f02008-02-22 18:28:33 +00002713
jadmanski0afbb632008-06-06 21:10:57 +00002714 if not self.is_server_job():
2715 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002716
showardb2e2c322008-10-14 17:33:55 +00002717 return params
mblighe2586682008-02-29 22:45:46 +00002718
mbligh36768f02008-02-22 18:28:33 +00002719
showardc9ae1782009-01-30 01:42:37 +00002720 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002721 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002722 return True
showard0fc38302008-10-23 00:44:07 +00002723 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002724 return queue_entry.get_host().dirty
2725 return False
showard21baa452008-10-21 00:08:39 +00002726
showardc9ae1782009-01-30 01:42:37 +00002727
2728 def _should_run_verify(self, queue_entry):
2729 do_not_verify = (queue_entry.host.protection ==
2730 host_protections.Protection.DO_NOT_VERIFY)
2731 if do_not_verify:
2732 return False
2733 return self.run_verify
2734
2735
2736 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002737 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002738 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002739 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002740 if self._should_run_verify(queue_entry):
2741 tasks.append(VerifyTask(queue_entry=queue_entry))
2742 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002743 return tasks
2744
2745
showardf1ae3542009-05-11 19:26:02 +00002746 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002747 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002748 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002749 else:
showardf1ae3542009-05-11 19:26:02 +00002750 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002751 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002752 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002753 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002754
2755 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002756 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002757
2758
2759 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002760 """
2761 @returns A tuple containing a list of HostQueueEntry instances to be
2762 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002763 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002764 """
2765 if include_queue_entry.atomic_group_id:
2766 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2767 always_query=False)
2768 else:
2769 atomic_group = None
2770
showard2bab8f42008-11-12 18:15:22 +00002771 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002772 if atomic_group:
2773 num_entries_wanted = atomic_group.max_number_of_machines
2774 else:
2775 num_entries_wanted = self.synch_count
2776 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002777
showardf1ae3542009-05-11 19:26:02 +00002778 if num_entries_wanted > 0:
2779 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002780 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002781 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002782 params=(self.id, include_queue_entry.id)))
2783
2784 # Sort the chosen hosts by hostname before slicing.
2785 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2786 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2787 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2788 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002789
showardf1ae3542009-05-11 19:26:02 +00002790 # Sanity check. We'll only ever be called if this can be met.
2791 assert len(chosen_entries) >= self.synch_count
2792
2793 if atomic_group:
2794 # Look at any meta_host and dependency labels and pick the first
2795 # one that also specifies this atomic group. Use that label name
2796 # as the group name if possible (it is more specific).
2797 group_name = atomic_group.name
2798 for label in include_queue_entry.get_labels():
2799 if label.atomic_group_id:
2800 assert label.atomic_group_id == atomic_group.id
2801 group_name = label.name
2802 break
2803 else:
2804 group_name = ''
2805
2806 self._assign_new_group(chosen_entries, group_name=group_name)
2807 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002808
2809
2810 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002811 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002812 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2813 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002814
showardf1ae3542009-05-11 19:26:02 +00002815 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2816 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002817
2818
showardf1ae3542009-05-11 19:26:02 +00002819 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002820 for queue_entry in queue_entries:
2821 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002822 params = self._get_autoserv_params(queue_entries)
2823 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002824 cmd=params, group_name=group_name)
2825 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002826 entry_ids = [entry.id for entry in queue_entries]
2827
showard170873e2009-01-07 00:22:26 +00002828 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002829
2830
mbligh36768f02008-02-22 18:28:33 +00002831if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002832 main()