blob: 3c9948efbc25930dd7c7db1aef4ca01d3d550f4b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000017from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000085 except SystemExit:
86 raise
showard27f33872009-04-07 18:20:53 +000087 except:
88 logging.exception('Exception escaping in monitor_db')
89 raise
90
91
92def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000093 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000094
jadmanski0afbb632008-06-06 21:10:57 +000095 parser = optparse.OptionParser(usage)
96 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
97 action='store_true')
98 parser.add_option('--logfile', help='Set a log file that all stdout ' +
99 'should be redirected to. Stderr will go to this ' +
100 'file + ".err"')
101 parser.add_option('--test', help='Indicate that scheduler is under ' +
102 'test and should use dummy autoserv and no parsing',
103 action='store_true')
104 (options, args) = parser.parse_args()
105 if len(args) != 1:
106 parser.print_usage()
107 return
mbligh36768f02008-02-22 18:28:33 +0000108
jadmanski0afbb632008-06-06 21:10:57 +0000109 global RESULTS_DIR
110 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000111
mbligh83c1e9e2009-05-01 23:10:41 +0000112 site_init = utils.import_site_function(__file__,
113 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
114 _site_init_monitor_db_dummy)
115 site_init()
116
showardcca334f2009-03-12 20:38:34 +0000117 # Change the cwd while running to avoid issues incase we were launched from
118 # somewhere odd (such as a random NFS home directory of the person running
119 # sudo to launch us as the appropriate user).
120 os.chdir(RESULTS_DIR)
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000123 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
124 "notify_email_statuses",
125 default='')
showardc85c21b2008-11-24 22:17:37 +0000126 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000127 _notify_email_statuses = [status for status in
128 re.split(r'[\s,;:]', notify_statuses_list.lower())
129 if status]
showardc85c21b2008-11-24 22:17:37 +0000130
jadmanski0afbb632008-06-06 21:10:57 +0000131 if options.test:
132 global _autoserv_path
133 _autoserv_path = 'autoserv_dummy'
134 global _testing_mode
135 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000136
mbligh37eceaa2008-12-15 22:56:37 +0000137 # AUTOTEST_WEB.base_url is still a supported config option as some people
138 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000139 global _base_url
showard170873e2009-01-07 00:22:26 +0000140 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
141 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000142 if config_base_url:
143 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000144 else:
mbligh37eceaa2008-12-15 22:56:37 +0000145 # For the common case of everything running on a single server you
146 # can just set the hostname in a single place in the config file.
147 server_name = c.get_config_value('SERVER', 'hostname')
148 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000149 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000150 sys.exit(1)
151 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000152
showardc5afc462009-01-13 00:09:39 +0000153 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
showardc5afc462009-01-13 00:09:39 +0000157 init(options.logfile)
158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
jadmanski0afbb632008-06-06 21:10:57 +0000161 while not _shutdown:
162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
180def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000181 if logfile:
182 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
184 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000185
mblighfb676032009-04-01 18:25:38 +0000186 utils.write_pid("monitor_db")
187
showardb1e51872008-10-07 11:08:18 +0000188 if _testing_mode:
189 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000190 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
193 global _db
showard170873e2009-01-07 00:22:26 +0000194 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000195 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000196
showardfa8629c2008-11-04 16:51:23 +0000197 # ensure Django connection is in autocommit
198 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000199 # bypass the readonly connection
200 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000201
showardb18134f2009-03-20 20:52:18 +0000202 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000203 signal.signal(signal.SIGINT, handle_sigint)
204
showardd1ee1dd2009-01-07 21:33:08 +0000205 drones = global_config.global_config.get_config_value(
206 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
207 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000208 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000209 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000210 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000213
214
215def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000216 out_file = logfile
217 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000218 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000219 out_fd = open(out_file, "a", buffering=0)
220 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000221
jadmanski0afbb632008-06-06 21:10:57 +0000222 os.dup2(out_fd.fileno(), sys.stdout.fileno())
223 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000224
jadmanski0afbb632008-06-06 21:10:57 +0000225 sys.stdout = out_fd
226 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000227
228
showard87ba02a2009-04-20 19:37:32 +0000229def _autoserv_command_line(machines, results_dir, extra_args, job=None,
230 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000231 """
232 @returns The autoserv command line as a list of executable + parameters.
233
234 @param machines - string - A machine or comma separated list of machines
235 for the (-m) flag.
236 @param results_dir - string - Where the results will be written (-r).
237 @param extra_args - list - Additional arguments to pass to autoserv.
238 @param job - Job object - If supplied, -u owner and -l name parameters
239 will be added.
240 @param queue_entry - A HostQueueEntry object - If supplied and no Job
241 object was supplied, this will be used to lookup the Job object.
242 """
showard87ba02a2009-04-20 19:37:32 +0000243 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
244 '-r', _drone_manager.absolute_path(results_dir)]
245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
249 return autoserv_argv + extra_args
250
251
showard89f84db2009-03-12 20:39:13 +0000252class SchedulerError(Exception):
253 """Raised by HostScheduler when an inconsistent state occurs."""
254
255
showard63a34772008-08-18 19:32:50 +0000256class HostScheduler(object):
257 def _get_ready_hosts(self):
258 # avoid any host with a currently active queue entry against it
259 hosts = Host.fetch(
260 joins='LEFT JOIN host_queue_entries AS active_hqe '
261 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000262 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000263 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000264 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000265 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
266 return dict((host.id, host) for host in hosts)
267
268
269 @staticmethod
270 def _get_sql_id_list(id_list):
271 return ','.join(str(item_id) for item_id in id_list)
272
273
274 @classmethod
showard989f25d2008-10-01 11:38:11 +0000275 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000276 if not id_list:
277 return {}
showard63a34772008-08-18 19:32:50 +0000278 query %= cls._get_sql_id_list(id_list)
279 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000280 return cls._process_many2many_dict(rows, flip)
281
282
283 @staticmethod
284 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000285 result = {}
286 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000287 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000288 if flip:
289 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000290 result.setdefault(left_id, set()).add(right_id)
291 return result
292
293
294 @classmethod
295 def _get_job_acl_groups(cls, job_ids):
296 query = """
showardd9ac4452009-02-07 02:04:37 +0000297 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000298 FROM jobs
299 INNER JOIN users ON users.login = jobs.owner
300 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
301 WHERE jobs.id IN (%s)
302 """
303 return cls._get_many2many_dict(query, job_ids)
304
305
306 @classmethod
307 def _get_job_ineligible_hosts(cls, job_ids):
308 query = """
309 SELECT job_id, host_id
310 FROM ineligible_host_queues
311 WHERE job_id IN (%s)
312 """
313 return cls._get_many2many_dict(query, job_ids)
314
315
316 @classmethod
showard989f25d2008-10-01 11:38:11 +0000317 def _get_job_dependencies(cls, job_ids):
318 query = """
319 SELECT job_id, label_id
320 FROM jobs_dependency_labels
321 WHERE job_id IN (%s)
322 """
323 return cls._get_many2many_dict(query, job_ids)
324
325
326 @classmethod
showard63a34772008-08-18 19:32:50 +0000327 def _get_host_acls(cls, host_ids):
328 query = """
showardd9ac4452009-02-07 02:04:37 +0000329 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000330 FROM acl_groups_hosts
331 WHERE host_id IN (%s)
332 """
333 return cls._get_many2many_dict(query, host_ids)
334
335
336 @classmethod
337 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000338 if not host_ids:
339 return {}, {}
showard63a34772008-08-18 19:32:50 +0000340 query = """
341 SELECT label_id, host_id
342 FROM hosts_labels
343 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000344 """ % cls._get_sql_id_list(host_ids)
345 rows = _db.execute(query)
346 labels_to_hosts = cls._process_many2many_dict(rows)
347 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
348 return labels_to_hosts, hosts_to_labels
349
350
351 @classmethod
352 def _get_labels(cls):
353 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000354
355
356 def refresh(self, pending_queue_entries):
357 self._hosts_available = self._get_ready_hosts()
358
359 relevant_jobs = [queue_entry.job_id
360 for queue_entry in pending_queue_entries]
361 self._job_acls = self._get_job_acl_groups(relevant_jobs)
362 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000363 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000364
365 host_ids = self._hosts_available.keys()
366 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000367 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
368
369 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000370
371
372 def _is_acl_accessible(self, host_id, queue_entry):
373 job_acls = self._job_acls.get(queue_entry.job_id, set())
374 host_acls = self._host_acls.get(host_id, set())
375 return len(host_acls.intersection(job_acls)) > 0
376
377
showard989f25d2008-10-01 11:38:11 +0000378 def _check_job_dependencies(self, job_dependencies, host_labels):
379 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000380 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000381
382
383 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
384 queue_entry):
showardade14e22009-01-26 22:38:32 +0000385 if not queue_entry.meta_host:
386 # bypass only_if_needed labels when a specific host is selected
387 return True
388
showard989f25d2008-10-01 11:38:11 +0000389 for label_id in host_labels:
390 label = self._labels[label_id]
391 if not label.only_if_needed:
392 # we don't care about non-only_if_needed labels
393 continue
394 if queue_entry.meta_host == label_id:
395 # if the label was requested in a metahost it's OK
396 continue
397 if label_id not in job_dependencies:
398 return False
399 return True
400
401
showard89f84db2009-03-12 20:39:13 +0000402 def _check_atomic_group_labels(self, host_labels, queue_entry):
403 """
404 Determine if the given HostQueueEntry's atomic group settings are okay
405 to schedule on a host with the given labels.
406
407 @param host_labels - A list of label ids that the host has.
408 @param queue_entry - The HostQueueEntry being considered for the host.
409
410 @returns True if atomic group settings are okay, False otherwise.
411 """
412 return (self._get_host_atomic_group_id(host_labels) ==
413 queue_entry.atomic_group_id)
414
415
416 def _get_host_atomic_group_id(self, host_labels):
417 """
418 Return the atomic group label id for a host with the given set of
419 labels if any, or None otherwise. Raises an exception if more than
420 one atomic group are found in the set of labels.
421
422 @param host_labels - A list of label ids that the host has.
423
424 @returns The id of the atomic group found on a label in host_labels
425 or None if no atomic group label is found.
426 @raises SchedulerError - If more than one atomic group label is found.
427 """
428 atomic_ids = [self._labels[label_id].atomic_group_id
429 for label_id in host_labels
430 if self._labels[label_id].atomic_group_id is not None]
431 if not atomic_ids:
432 return None
433 if len(atomic_ids) > 1:
434 raise SchedulerError('More than one atomic label on host.')
435 return atomic_ids[0]
436
437
438 def _get_atomic_group_labels(self, atomic_group_id):
439 """
440 Lookup the label ids that an atomic_group is associated with.
441
442 @param atomic_group_id - The id of the AtomicGroup to look up.
443
444 @returns A generator yeilding Label ids for this atomic group.
445 """
446 return (id for id, label in self._labels.iteritems()
447 if label.atomic_group_id == atomic_group_id
448 and not label.invalid)
449
450
showard54c1ea92009-05-20 00:32:58 +0000451 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000452 """
453 @param group_hosts - A sequence of Host ids to test for usability
454 and eligibility against the Job associated with queue_entry.
455 @param queue_entry - The HostQueueEntry that these hosts are being
456 tested for eligibility against.
457
458 @returns A subset of group_hosts Host ids that are eligible for the
459 supplied queue_entry.
460 """
461 return set(host_id for host_id in group_hosts
462 if self._is_host_usable(host_id)
463 and self._is_host_eligible_for_job(host_id, queue_entry))
464
465
showard989f25d2008-10-01 11:38:11 +0000466 def _is_host_eligible_for_job(self, host_id, queue_entry):
467 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
468 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000469
showard89f84db2009-03-12 20:39:13 +0000470 return (self._is_acl_accessible(host_id, queue_entry) and
471 self._check_job_dependencies(job_dependencies, host_labels) and
472 self._check_only_if_needed_labels(
473 job_dependencies, host_labels, queue_entry) and
474 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
548 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000693 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000694 self._reverify_remaining_hosts()
695 # reinitialize drones after killing orphaned processes, since they can
696 # leave around files when they die
697 _drone_manager.execute_actions()
698 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000699
showard170873e2009-01-07 00:22:26 +0000700
701 def _register_pidfiles(self):
702 # during recovery we may need to read pidfiles for both running and
703 # parsing entries
704 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000705 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000706 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000707 for pidfile_name in _ALL_PIDFILE_NAMES:
708 pidfile_id = _drone_manager.get_pidfile_id_from(
709 queue_entry.execution_tag(), pidfile_name=pidfile_name)
710 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000711
712
showardd3dc1992009-04-22 21:01:40 +0000713 def _recover_entries_with_status(self, status, orphans, pidfile_name,
714 recover_entries_fn):
715 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000716 for queue_entry in queue_entries:
717 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000718 # synchronous job we've already recovered
719 continue
showardd3dc1992009-04-22 21:01:40 +0000720 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000721 execution_tag = queue_entry.execution_tag()
722 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000723 run_monitor.attach_to_existing_process(execution_tag,
724 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000725
726 log_message = ('Recovering %s entry %s ' %
727 (status.lower(),
728 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000729 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000730 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000731 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000732 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000733 continue
mbligh90a549d2008-03-25 23:52:34 +0000734
showard597bfd32009-05-08 18:22:50 +0000735 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000736 run_monitor.get_process())
737 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
738 orphans.discard(run_monitor.get_process())
739
740
741 def _kill_remaining_orphan_processes(self, orphans):
742 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000743 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000744 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000745
showard170873e2009-01-07 00:22:26 +0000746
showardd3dc1992009-04-22 21:01:40 +0000747 def _recover_running_entries(self, orphans):
748 def recover_entries(job, queue_entries, run_monitor):
749 if run_monitor is not None:
750 queue_task = RecoveryQueueTask(job=job,
751 queue_entries=queue_entries,
752 run_monitor=run_monitor)
753 self.add_agent(Agent(tasks=[queue_task],
754 num_processes=len(queue_entries)))
755 # else, _requeue_other_active_entries will cover this
756
757 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
758 orphans, '.autoserv_execute',
759 recover_entries)
760
761
762 def _recover_gathering_entries(self, orphans):
763 def recover_entries(job, queue_entries, run_monitor):
764 gather_task = GatherLogsTask(job, queue_entries,
765 run_monitor=run_monitor)
766 self.add_agent(Agent([gather_task]))
767
768 self._recover_entries_with_status(
769 models.HostQueueEntry.Status.GATHERING,
770 orphans, _CRASHINFO_PID_FILE, recover_entries)
771
772
773 def _recover_parsing_entries(self, orphans):
774 def recover_entries(job, queue_entries, run_monitor):
775 reparse_task = FinalReparseTask(queue_entries,
776 run_monitor=run_monitor)
777 self.add_agent(Agent([reparse_task], num_processes=0))
778
779 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
780 orphans, _PARSER_PID_FILE,
781 recover_entries)
782
783
784 def _recover_all_recoverable_entries(self):
785 orphans = _drone_manager.get_orphaned_autoserv_processes()
786 self._recover_running_entries(orphans)
787 self._recover_gathering_entries(orphans)
788 self._recover_parsing_entries(orphans)
789 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000790
showard97aed502008-11-04 02:01:24 +0000791
showard170873e2009-01-07 00:22:26 +0000792 def _requeue_other_active_entries(self):
793 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000794 where='active AND NOT complete AND '
795 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000796 for queue_entry in queue_entries:
797 if self.get_agents_for_entry(queue_entry):
798 # entry has already been recovered
799 continue
showardd3dc1992009-04-22 21:01:40 +0000800 if queue_entry.aborted:
801 queue_entry.abort(self)
802 continue
803
804 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000805 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000806 if queue_entry.host:
807 tasks = queue_entry.host.reverify_tasks()
808 self.add_agent(Agent(tasks))
809 agent = queue_entry.requeue()
810
811
showard1ff7b2e2009-05-15 23:17:18 +0000812 def _find_reverify(self):
813 self._reverify_hosts_where("status = 'Reverify'")
814
815
showard170873e2009-01-07 00:22:26 +0000816 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000817 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000818 self._reverify_hosts_where("""(status = 'Repairing' OR
819 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000820 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000821
showard170873e2009-01-07 00:22:26 +0000822 # recover "Running" hosts with no active queue entries, although this
823 # should never happen
824 message = ('Recovering running host %s - this probably indicates a '
825 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000826 self._reverify_hosts_where("""status = 'Running' AND
827 id NOT IN (SELECT host_id
828 FROM host_queue_entries
829 WHERE active)""",
830 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000831
832
jadmanski0afbb632008-06-06 21:10:57 +0000833 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000834 print_message='Reverifying host %s'):
835 full_where='locked = 0 AND invalid = 0 AND ' + where
836 for host in Host.fetch(where=full_where):
837 if self.host_has_agent(host):
838 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000839 continue
showard170873e2009-01-07 00:22:26 +0000840 if print_message:
showardb18134f2009-03-20 20:52:18 +0000841 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000842 tasks = host.reverify_tasks()
843 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000844
845
jadmanski0afbb632008-06-06 21:10:57 +0000846 def _recover_hosts(self):
847 # recover "Repair Failed" hosts
848 message = 'Reverifying dead host %s'
849 self._reverify_hosts_where("status = 'Repair Failed'",
850 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000851
852
showard04c82c52008-05-29 19:38:12 +0000853
showardb95b1bd2008-08-15 18:11:04 +0000854 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000855 # prioritize by job priority, then non-metahost over metahost, then FIFO
856 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000857 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000858 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000859 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000860
861
showard89f84db2009-03-12 20:39:13 +0000862 def _refresh_pending_queue_entries(self):
863 """
864 Lookup the pending HostQueueEntries and call our HostScheduler
865 refresh() method given that list. Return the list.
866
867 @returns A list of pending HostQueueEntries sorted in priority order.
868 """
showard63a34772008-08-18 19:32:50 +0000869 queue_entries = self._get_pending_queue_entries()
870 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000871 return []
showardb95b1bd2008-08-15 18:11:04 +0000872
showard63a34772008-08-18 19:32:50 +0000873 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000874
showard89f84db2009-03-12 20:39:13 +0000875 return queue_entries
876
877
878 def _schedule_atomic_group(self, queue_entry):
879 """
880 Schedule the given queue_entry on an atomic group of hosts.
881
882 Returns immediately if there are insufficient available hosts.
883
884 Creates new HostQueueEntries based off of queue_entry for the
885 scheduled hosts and starts them all running.
886 """
887 # This is a virtual host queue entry representing an entire
888 # atomic group, find a group and schedule their hosts.
889 group_hosts = self._host_scheduler.find_eligible_atomic_group(
890 queue_entry)
891 if not group_hosts:
892 return
893 # The first assigned host uses the original HostQueueEntry
894 group_queue_entries = [queue_entry]
895 for assigned_host in group_hosts[1:]:
896 # Create a new HQE for every additional assigned_host.
897 new_hqe = HostQueueEntry.clone(queue_entry)
898 new_hqe.save()
899 group_queue_entries.append(new_hqe)
900 assert len(group_queue_entries) == len(group_hosts)
901 for queue_entry, host in itertools.izip(group_queue_entries,
902 group_hosts):
903 self._run_queue_entry(queue_entry, host)
904
905
906 def _schedule_new_jobs(self):
907 queue_entries = self._refresh_pending_queue_entries()
908 if not queue_entries:
909 return
910
showard63a34772008-08-18 19:32:50 +0000911 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000912 if (queue_entry.atomic_group_id is None or
913 queue_entry.host_id is not None):
914 assigned_host = self._host_scheduler.find_eligible_host(
915 queue_entry)
916 if assigned_host:
917 self._run_queue_entry(queue_entry, assigned_host)
918 else:
919 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000920
921
922 def _run_queue_entry(self, queue_entry, host):
923 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000924 # in some cases (synchronous jobs with run_verify=False), agent may be
925 # None
showard9976ce92008-10-15 20:28:13 +0000926 if agent:
927 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000928
929
jadmanski0afbb632008-06-06 21:10:57 +0000930 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000931 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
932 for agent in self.get_agents_for_entry(entry):
933 agent.abort()
934 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000935
936
showard324bf812009-01-20 23:23:38 +0000937 def _can_start_agent(self, agent, num_started_this_cycle,
938 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000939 # always allow zero-process agents to run
940 if agent.num_processes == 0:
941 return True
942 # don't allow any nonzero-process agents to run after we've reached a
943 # limit (this avoids starvation of many-process agents)
944 if have_reached_limit:
945 return False
946 # total process throttling
showard324bf812009-01-20 23:23:38 +0000947 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000948 return False
949 # if a single agent exceeds the per-cycle throttling, still allow it to
950 # run when it's the first agent in the cycle
951 if num_started_this_cycle == 0:
952 return True
953 # per-cycle throttling
954 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000955 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000956 return False
957 return True
958
959
jadmanski0afbb632008-06-06 21:10:57 +0000960 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000961 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000962 have_reached_limit = False
963 # iterate over copy, so we can remove agents during iteration
964 for agent in list(self._agents):
965 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000966 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000967 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000968 continue
969 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000970 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000971 have_reached_limit):
972 have_reached_limit = True
973 continue
showard4c5374f2008-09-04 17:02:56 +0000974 num_started_this_cycle += agent.num_processes
975 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000976 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000977 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000978
979
showard29f7cd22009-04-29 21:16:24 +0000980 def _process_recurring_runs(self):
981 recurring_runs = models.RecurringRun.objects.filter(
982 start_date__lte=datetime.datetime.now())
983 for rrun in recurring_runs:
984 # Create job from template
985 job = rrun.job
986 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000987 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000988
989 host_objects = info['hosts']
990 one_time_hosts = info['one_time_hosts']
991 metahost_objects = info['meta_hosts']
992 dependencies = info['dependencies']
993 atomic_group = info['atomic_group']
994
995 for host in one_time_hosts or []:
996 this_host = models.Host.create_one_time_host(host.hostname)
997 host_objects.append(this_host)
998
999 try:
1000 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001001 options=options,
showard29f7cd22009-04-29 21:16:24 +00001002 host_objects=host_objects,
1003 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001004 atomic_group=atomic_group)
1005
1006 except Exception, ex:
1007 logging.exception(ex)
1008 #TODO send email
1009
1010 if rrun.loop_count == 1:
1011 rrun.delete()
1012 else:
1013 if rrun.loop_count != 0: # if not infinite loop
1014 # calculate new start_date
1015 difference = datetime.timedelta(seconds=rrun.loop_period)
1016 rrun.start_date = rrun.start_date + difference
1017 rrun.loop_count -= 1
1018 rrun.save()
1019
1020
showard170873e2009-01-07 00:22:26 +00001021class PidfileRunMonitor(object):
1022 """
1023 Client must call either run() to start a new process or
1024 attach_to_existing_process().
1025 """
mbligh36768f02008-02-22 18:28:33 +00001026
showard170873e2009-01-07 00:22:26 +00001027 class _PidfileException(Exception):
1028 """
1029 Raised when there's some unexpected behavior with the pid file, but only
1030 used internally (never allowed to escape this class).
1031 """
mbligh36768f02008-02-22 18:28:33 +00001032
1033
showard170873e2009-01-07 00:22:26 +00001034 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001035 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001036 self._start_time = None
1037 self.pidfile_id = None
1038 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001039
1040
showard170873e2009-01-07 00:22:26 +00001041 def _add_nice_command(self, command, nice_level):
1042 if not nice_level:
1043 return command
1044 return ['nice', '-n', str(nice_level)] + command
1045
1046
1047 def _set_start_time(self):
1048 self._start_time = time.time()
1049
1050
1051 def run(self, command, working_directory, nice_level=None, log_file=None,
1052 pidfile_name=None, paired_with_pidfile=None):
1053 assert command is not None
1054 if nice_level is not None:
1055 command = ['nice', '-n', str(nice_level)] + command
1056 self._set_start_time()
1057 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001058 command, working_directory, pidfile_name=pidfile_name,
1059 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001060
1061
showardd3dc1992009-04-22 21:01:40 +00001062 def attach_to_existing_process(self, execution_tag,
1063 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001064 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001065 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1066 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001067 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001068
1069
jadmanski0afbb632008-06-06 21:10:57 +00001070 def kill(self):
showard170873e2009-01-07 00:22:26 +00001071 if self.has_process():
1072 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001073
mbligh36768f02008-02-22 18:28:33 +00001074
showard170873e2009-01-07 00:22:26 +00001075 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001076 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001077 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001078
1079
showard170873e2009-01-07 00:22:26 +00001080 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001081 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001082 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001083 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001084
1085
showard170873e2009-01-07 00:22:26 +00001086 def _read_pidfile(self, use_second_read=False):
1087 assert self.pidfile_id is not None, (
1088 'You must call run() or attach_to_existing_process()')
1089 contents = _drone_manager.get_pidfile_contents(
1090 self.pidfile_id, use_second_read=use_second_read)
1091 if contents.is_invalid():
1092 self._state = drone_manager.PidfileContents()
1093 raise self._PidfileException(contents)
1094 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001095
1096
showard21baa452008-10-21 00:08:39 +00001097 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001098 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1099 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001100 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001101 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001102 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001103
1104
1105 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001106 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001107 return
mblighbb421852008-03-11 22:36:16 +00001108
showard21baa452008-10-21 00:08:39 +00001109 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001110
showard170873e2009-01-07 00:22:26 +00001111 if self._state.process is None:
1112 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001113 return
mbligh90a549d2008-03-25 23:52:34 +00001114
showard21baa452008-10-21 00:08:39 +00001115 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001116 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001117 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001118 return
mbligh90a549d2008-03-25 23:52:34 +00001119
showard170873e2009-01-07 00:22:26 +00001120 # pid but no running process - maybe process *just* exited
1121 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001122 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001123 # autoserv exited without writing an exit code
1124 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001125 self._handle_pidfile_error(
1126 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001127
showard21baa452008-10-21 00:08:39 +00001128
1129 def _get_pidfile_info(self):
1130 """\
1131 After completion, self._state will contain:
1132 pid=None, exit_status=None if autoserv has not yet run
1133 pid!=None, exit_status=None if autoserv is running
1134 pid!=None, exit_status!=None if autoserv has completed
1135 """
1136 try:
1137 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001138 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001139 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001140
1141
showard170873e2009-01-07 00:22:26 +00001142 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001143 """\
1144 Called when no pidfile is found or no pid is in the pidfile.
1145 """
showard170873e2009-01-07 00:22:26 +00001146 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001147 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001148 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1149 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001150 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001151 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001152
1153
showard35162b02009-03-03 02:17:30 +00001154 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001155 """\
1156 Called when autoserv has exited without writing an exit status,
1157 or we've timed out waiting for autoserv to write a pid to the
1158 pidfile. In either case, we just return failure and the caller
1159 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001160
showard170873e2009-01-07 00:22:26 +00001161 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001162 """
1163 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001164 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001165 self._state.exit_status = 1
1166 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001167
1168
jadmanski0afbb632008-06-06 21:10:57 +00001169 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001170 self._get_pidfile_info()
1171 return self._state.exit_status
1172
1173
1174 def num_tests_failed(self):
1175 self._get_pidfile_info()
1176 assert self._state.num_tests_failed is not None
1177 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001178
1179
mbligh36768f02008-02-22 18:28:33 +00001180class Agent(object):
showard170873e2009-01-07 00:22:26 +00001181 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001182 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001183 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001184 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001185 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001186
showard170873e2009-01-07 00:22:26 +00001187 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1188 for task in tasks)
1189 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1190
showardd3dc1992009-04-22 21:01:40 +00001191 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001192 for task in tasks:
1193 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001194
1195
showardd3dc1992009-04-22 21:01:40 +00001196 def _clear_queue(self):
1197 self.queue = Queue.Queue(0)
1198
1199
showard170873e2009-01-07 00:22:26 +00001200 def _union_ids(self, id_lists):
1201 return set(itertools.chain(*id_lists))
1202
1203
jadmanski0afbb632008-06-06 21:10:57 +00001204 def add_task(self, task):
1205 self.queue.put_nowait(task)
1206 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001207
1208
jadmanski0afbb632008-06-06 21:10:57 +00001209 def tick(self):
showard21baa452008-10-21 00:08:39 +00001210 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001211 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001212 self.active_task.poll()
1213 if not self.active_task.is_done():
1214 return
1215 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001219 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001220 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001221 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001222 if not self.active_task.success:
1223 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001224 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001225
jadmanski0afbb632008-06-06 21:10:57 +00001226 if not self.is_done():
1227 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001231 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001232 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1233 # get reset.
1234 new_agent = Agent(self.active_task.failure_tasks)
1235 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001236
mblighe2586682008-02-29 22:45:46 +00001237
showard4c5374f2008-09-04 17:02:56 +00001238 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001239 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001240
1241
jadmanski0afbb632008-06-06 21:10:57 +00001242 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001243 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001244
1245
showardd3dc1992009-04-22 21:01:40 +00001246 def abort(self):
showard08a36412009-05-05 01:01:13 +00001247 # abort tasks until the queue is empty or a task ignores the abort
1248 while not self.is_done():
1249 if not self.active_task:
1250 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001251 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001252 if not self.active_task.aborted:
1253 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001254 return
1255 self.active_task = None
1256
showardd3dc1992009-04-22 21:01:40 +00001257
mbligh36768f02008-02-22 18:28:33 +00001258class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001259 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1260 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001261 self.done = False
1262 self.failure_tasks = failure_tasks
1263 self.started = False
1264 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001265 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001266 self.task = None
1267 self.agent = None
1268 self.monitor = None
1269 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001270 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001271 self.queue_entry_ids = []
1272 self.host_ids = []
1273 self.log_file = None
1274
1275
1276 def _set_ids(self, host=None, queue_entries=None):
1277 if queue_entries and queue_entries != [None]:
1278 self.host_ids = [entry.host.id for entry in queue_entries]
1279 self.queue_entry_ids = [entry.id for entry in queue_entries]
1280 else:
1281 assert host
1282 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001283
1284
jadmanski0afbb632008-06-06 21:10:57 +00001285 def poll(self):
showard08a36412009-05-05 01:01:13 +00001286 if not self.started:
1287 self.start()
1288 self.tick()
1289
1290
1291 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001292 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001293 exit_code = self.monitor.exit_code()
1294 if exit_code is None:
1295 return
1296 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001297 else:
1298 success = False
mbligh36768f02008-02-22 18:28:33 +00001299
jadmanski0afbb632008-06-06 21:10:57 +00001300 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001301
1302
jadmanski0afbb632008-06-06 21:10:57 +00001303 def is_done(self):
1304 return self.done
mbligh36768f02008-02-22 18:28:33 +00001305
1306
jadmanski0afbb632008-06-06 21:10:57 +00001307 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001308 if self.done:
1309 return
jadmanski0afbb632008-06-06 21:10:57 +00001310 self.done = True
1311 self.success = success
1312 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001313
1314
jadmanski0afbb632008-06-06 21:10:57 +00001315 def prolog(self):
1316 pass
mblighd64e5702008-04-04 21:39:28 +00001317
1318
jadmanski0afbb632008-06-06 21:10:57 +00001319 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001320 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001321
mbligh36768f02008-02-22 18:28:33 +00001322
jadmanski0afbb632008-06-06 21:10:57 +00001323 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001324 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001325 _drone_manager.copy_to_results_repository(
1326 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001327
1328
jadmanski0afbb632008-06-06 21:10:57 +00001329 def epilog(self):
1330 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001331
1332
jadmanski0afbb632008-06-06 21:10:57 +00001333 def start(self):
1334 assert self.agent
1335
1336 if not self.started:
1337 self.prolog()
1338 self.run()
1339
1340 self.started = True
1341
1342
1343 def abort(self):
1344 if self.monitor:
1345 self.monitor.kill()
1346 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001347 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001348 self.cleanup()
1349
1350
showard170873e2009-01-07 00:22:26 +00001351 def set_host_log_file(self, base_name, host):
1352 filename = '%s.%s' % (time.time(), base_name)
1353 self.log_file = os.path.join('hosts', host.hostname, filename)
1354
1355
showardde634ee2009-01-30 01:44:24 +00001356 def _get_consistent_execution_tag(self, queue_entries):
1357 first_execution_tag = queue_entries[0].execution_tag()
1358 for queue_entry in queue_entries[1:]:
1359 assert queue_entry.execution_tag() == first_execution_tag, (
1360 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1361 queue_entry,
1362 first_execution_tag,
1363 queue_entries[0]))
1364 return first_execution_tag
1365
1366
showarda1e74b32009-05-12 17:32:04 +00001367 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001368 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001369 if use_monitor is None:
1370 assert self.monitor
1371 use_monitor = self.monitor
1372 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001373 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001374 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001375 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001376 results_path)
showardde634ee2009-01-30 01:44:24 +00001377
showarda1e74b32009-05-12 17:32:04 +00001378
1379 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001380 reparse_task = FinalReparseTask(queue_entries)
1381 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1382
1383
showarda1e74b32009-05-12 17:32:04 +00001384 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1385 self._copy_results(queue_entries, use_monitor)
1386 self._parse_results(queue_entries)
1387
1388
showardd3dc1992009-04-22 21:01:40 +00001389 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001390 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001391 self.monitor = PidfileRunMonitor()
1392 self.monitor.run(self.cmd, self._working_directory,
1393 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001394 log_file=self.log_file,
1395 pidfile_name=pidfile_name,
1396 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001397
1398
showardd9205182009-04-27 20:09:55 +00001399class TaskWithJobKeyvals(object):
1400 """AgentTask mixin providing functionality to help with job keyval files."""
1401 _KEYVAL_FILE = 'keyval'
1402 def _format_keyval(self, key, value):
1403 return '%s=%s' % (key, value)
1404
1405
1406 def _keyval_path(self):
1407 """Subclasses must override this"""
1408 raise NotImplemented
1409
1410
1411 def _write_keyval_after_job(self, field, value):
1412 assert self.monitor
1413 if not self.monitor.has_process():
1414 return
1415 _drone_manager.write_lines_to_file(
1416 self._keyval_path(), [self._format_keyval(field, value)],
1417 paired_with_process=self.monitor.get_process())
1418
1419
1420 def _job_queued_keyval(self, job):
1421 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1422
1423
1424 def _write_job_finished(self):
1425 self._write_keyval_after_job("job_finished", int(time.time()))
1426
1427
1428class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001429 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001430 """\
showard170873e2009-01-07 00:22:26 +00001431 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001432 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001433 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001434 # normalize the protection name
1435 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001436
jadmanski0afbb632008-06-06 21:10:57 +00001437 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001438 self.queue_entry_to_fail = queue_entry
1439 # *don't* include the queue entry in IDs -- if the queue entry is
1440 # aborted, we want to leave the repair task running
1441 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001442
1443 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001444 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1445 ['-R', '--host-protection', protection],
1446 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001447 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1448
showard170873e2009-01-07 00:22:26 +00001449 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001450
mbligh36768f02008-02-22 18:28:33 +00001451
jadmanski0afbb632008-06-06 21:10:57 +00001452 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001453 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001454 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001455 if self.queue_entry_to_fail:
1456 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001457
1458
showardd9205182009-04-27 20:09:55 +00001459 def _keyval_path(self):
1460 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1461
1462
showardde634ee2009-01-30 01:44:24 +00001463 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001464 assert self.queue_entry_to_fail
1465
1466 if self.queue_entry_to_fail.meta_host:
1467 return # don't fail metahost entries, they'll be reassigned
1468
1469 self.queue_entry_to_fail.update_from_database()
1470 if self.queue_entry_to_fail.status != 'Queued':
1471 return # entry has been aborted
1472
1473 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001474 queued_key, queued_time = self._job_queued_keyval(
1475 self.queue_entry_to_fail.job)
1476 self._write_keyval_after_job(queued_key, queued_time)
1477 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001478 # copy results logs into the normal place for job results
1479 _drone_manager.copy_results_on_drone(
1480 self.monitor.get_process(),
1481 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001482 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001483
showarda1e74b32009-05-12 17:32:04 +00001484 self._copy_results([self.queue_entry_to_fail])
1485 if self.queue_entry_to_fail.job.parse_failed_repair:
1486 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001487 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001488
1489
jadmanski0afbb632008-06-06 21:10:57 +00001490 def epilog(self):
1491 super(RepairTask, self).epilog()
1492 if self.success:
1493 self.host.set_status('Ready')
1494 else:
1495 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001496 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001497 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001498
1499
showard8fe93b52008-11-18 17:53:22 +00001500class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001501 def epilog(self):
1502 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001503 should_copy_results = (self.queue_entry and not self.success
1504 and not self.queue_entry.meta_host)
1505 if should_copy_results:
1506 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001507 destination = os.path.join(self.queue_entry.execution_tag(),
1508 os.path.basename(self.log_file))
1509 _drone_manager.copy_to_results_repository(
1510 self.monitor.get_process(), self.log_file,
1511 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001512
1513
1514class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001515 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001516 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001517 self.host = host or queue_entry.host
1518 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001519
jadmanski0afbb632008-06-06 21:10:57 +00001520 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001521 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1522 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001523 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001524 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1525 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001526
showard170873e2009-01-07 00:22:26 +00001527 self.set_host_log_file('verify', self.host)
1528 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001529
1530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001532 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001533 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001534 if self.queue_entry:
1535 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001536 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001537
1538
jadmanski0afbb632008-06-06 21:10:57 +00001539 def epilog(self):
1540 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001541
jadmanski0afbb632008-06-06 21:10:57 +00001542 if self.success:
1543 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001544
1545
showardd9205182009-04-27 20:09:55 +00001546class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001547 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001548 self.job = job
1549 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001550 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001551 super(QueueTask, self).__init__(cmd, self._execution_tag())
1552 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001553
1554
showard73ec0442009-02-07 02:05:20 +00001555 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001556 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001557
1558
1559 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1560 keyval_contents = '\n'.join(self._format_keyval(key, value)
1561 for key, value in keyval_dict.iteritems())
1562 # always end with a newline to allow additional keyvals to be written
1563 keyval_contents += '\n'
1564 _drone_manager.attach_file_to_execution(self._execution_tag(),
1565 keyval_contents,
1566 file_path=keyval_path)
1567
1568
1569 def _write_keyvals_before_job(self, keyval_dict):
1570 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1571
1572
showard170873e2009-01-07 00:22:26 +00001573 def _write_host_keyvals(self, host):
1574 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1575 host.hostname)
1576 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001577 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1578 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001579
1580
showard170873e2009-01-07 00:22:26 +00001581 def _execution_tag(self):
1582 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001583
1584
jadmanski0afbb632008-06-06 21:10:57 +00001585 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001586 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001587 keyval_dict = {queued_key: queued_time}
1588 if self.group_name:
1589 keyval_dict['host_group_name'] = self.group_name
1590 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001591 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001592 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001593 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001594 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001595 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001596 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001597 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001598 assert len(self.queue_entries) == 1
1599 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001600
1601
showard35162b02009-03-03 02:17:30 +00001602 def _write_lost_process_error_file(self):
1603 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1604 _drone_manager.write_lines_to_file(error_file_path,
1605 [_LOST_PROCESS_ERROR])
1606
1607
showardd3dc1992009-04-22 21:01:40 +00001608 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001609 if not self.monitor:
1610 return
1611
showardd9205182009-04-27 20:09:55 +00001612 self._write_job_finished()
1613
showardd3dc1992009-04-22 21:01:40 +00001614 # both of these conditionals can be true, iff the process ran, wrote a
1615 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001616 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001617 gather_task = GatherLogsTask(self.job, self.queue_entries)
1618 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001619
1620 if self.monitor.lost_process:
1621 self._write_lost_process_error_file()
1622 for queue_entry in self.queue_entries:
1623 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001624
1625
showardcbd74612008-11-19 21:42:02 +00001626 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001627 _drone_manager.write_lines_to_file(
1628 os.path.join(self._execution_tag(), 'status.log'),
1629 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001630 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001631
1632
jadmanskif7fa2cc2008-10-01 14:13:23 +00001633 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001634 if not self.monitor or not self.monitor.has_process():
1635 return
1636
jadmanskif7fa2cc2008-10-01 14:13:23 +00001637 # build up sets of all the aborted_by and aborted_on values
1638 aborted_by, aborted_on = set(), set()
1639 for queue_entry in self.queue_entries:
1640 if queue_entry.aborted_by:
1641 aborted_by.add(queue_entry.aborted_by)
1642 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1643 aborted_on.add(t)
1644
1645 # extract some actual, unique aborted by value and write it out
1646 assert len(aborted_by) <= 1
1647 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001648 aborted_by_value = aborted_by.pop()
1649 aborted_on_value = max(aborted_on)
1650 else:
1651 aborted_by_value = 'autotest_system'
1652 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001653
showarda0382352009-02-11 23:36:43 +00001654 self._write_keyval_after_job("aborted_by", aborted_by_value)
1655 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001656
showardcbd74612008-11-19 21:42:02 +00001657 aborted_on_string = str(datetime.datetime.fromtimestamp(
1658 aborted_on_value))
1659 self._write_status_comment('Job aborted by %s on %s' %
1660 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001661
1662
jadmanski0afbb632008-06-06 21:10:57 +00001663 def abort(self):
1664 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001665 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001666 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001667
1668
jadmanski0afbb632008-06-06 21:10:57 +00001669 def epilog(self):
1670 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001671 self._finish_task()
1672 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001673
1674
mblighbb421852008-03-11 22:36:16 +00001675class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001676 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001677 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001678 self.monitor = run_monitor
1679 self.started = True
1680 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001681
1682
jadmanski0afbb632008-06-06 21:10:57 +00001683 def run(self):
showard5add1c82009-05-26 19:27:46 +00001684 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001685
1686
jadmanski0afbb632008-06-06 21:10:57 +00001687 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001688 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001689
1690
showardd3dc1992009-04-22 21:01:40 +00001691class PostJobTask(AgentTask):
1692 def __init__(self, queue_entries, pidfile_name, logfile_name,
1693 run_monitor=None):
1694 """
1695 If run_monitor != None, we're recovering a running task.
1696 """
1697 self._queue_entries = queue_entries
1698 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001699
1700 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1701 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1702 self._autoserv_monitor = PidfileRunMonitor()
1703 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1704 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1705
1706 if _testing_mode:
1707 command = 'true'
1708 else:
1709 command = self._generate_command(self._results_dir)
1710
1711 super(PostJobTask, self).__init__(cmd=command,
1712 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001713 # this must happen *after* the super call
1714 self.monitor = run_monitor
1715 if run_monitor:
1716 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001717
1718 self.log_file = os.path.join(self._execution_tag, logfile_name)
1719 self._final_status = self._determine_final_status()
1720
1721
1722 def _generate_command(self, results_dir):
1723 raise NotImplementedError('Subclasses must override this')
1724
1725
1726 def _job_was_aborted(self):
1727 was_aborted = None
1728 for queue_entry in self._queue_entries:
1729 queue_entry.update_from_database()
1730 if was_aborted is None: # first queue entry
1731 was_aborted = bool(queue_entry.aborted)
1732 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1733 email_manager.manager.enqueue_notify_email(
1734 'Inconsistent abort state',
1735 'Queue entries have inconsistent abort state: ' +
1736 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1737 # don't crash here, just assume true
1738 return True
1739 return was_aborted
1740
1741
1742 def _determine_final_status(self):
1743 if self._job_was_aborted():
1744 return models.HostQueueEntry.Status.ABORTED
1745
1746 # we'll use a PidfileRunMonitor to read the autoserv exit status
1747 if self._autoserv_monitor.exit_code() == 0:
1748 return models.HostQueueEntry.Status.COMPLETED
1749 return models.HostQueueEntry.Status.FAILED
1750
1751
1752 def run(self):
showard5add1c82009-05-26 19:27:46 +00001753 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001754
showard5add1c82009-05-26 19:27:46 +00001755 # make sure we actually have results to work with.
1756 # this should never happen in normal operation.
1757 if not self._autoserv_monitor.has_process():
1758 email_manager.manager.enqueue_notify_email(
1759 'No results in post-job task',
1760 'No results in post-job task at %s' %
1761 self._autoserv_monitor.pidfile_id)
1762 self.finished(False)
1763 return
1764
1765 super(PostJobTask, self).run(
1766 pidfile_name=self._pidfile_name,
1767 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001768
1769
1770 def _set_all_statuses(self, status):
1771 for queue_entry in self._queue_entries:
1772 queue_entry.set_status(status)
1773
1774
1775 def abort(self):
1776 # override AgentTask.abort() to avoid killing the process and ending
1777 # the task. post-job tasks continue when the job is aborted.
1778 pass
1779
1780
1781class GatherLogsTask(PostJobTask):
1782 """
1783 Task responsible for
1784 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1785 * copying logs to the results repository
1786 * spawning CleanupTasks for hosts, if necessary
1787 * spawning a FinalReparseTask for the job
1788 """
1789 def __init__(self, job, queue_entries, run_monitor=None):
1790 self._job = job
1791 super(GatherLogsTask, self).__init__(
1792 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1793 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1794 self._set_ids(queue_entries=queue_entries)
1795
1796
1797 def _generate_command(self, results_dir):
1798 host_list = ','.join(queue_entry.host.hostname
1799 for queue_entry in self._queue_entries)
1800 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1801 '-r', results_dir]
1802
1803
1804 def prolog(self):
1805 super(GatherLogsTask, self).prolog()
1806 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1807
1808
1809 def _reboot_hosts(self):
1810 reboot_after = self._job.reboot_after
1811 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001812 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1813 do_reboot = True
1814 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001815 do_reboot = True
1816 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1817 final_success = (
1818 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1819 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1820 do_reboot = (final_success and num_tests_failed == 0)
1821
1822 for queue_entry in self._queue_entries:
1823 if do_reboot:
1824 # don't pass the queue entry to the CleanupTask. if the cleanup
1825 # fails, the job doesn't care -- it's over.
1826 cleanup_task = CleanupTask(host=queue_entry.host)
1827 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1828 else:
1829 queue_entry.host.set_status('Ready')
1830
1831
1832 def epilog(self):
1833 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001834 if self._autoserv_monitor.has_process():
1835 self._copy_and_parse_results(self._queue_entries,
1836 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001837 self._reboot_hosts()
1838
1839
showard0bbfc212009-04-29 21:06:13 +00001840 def run(self):
showard597bfd32009-05-08 18:22:50 +00001841 autoserv_exit_code = self._autoserv_monitor.exit_code()
1842 # only run if Autoserv exited due to some signal. if we have no exit
1843 # code, assume something bad (and signal-like) happened.
1844 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001845 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001846 else:
1847 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001848
1849
showard8fe93b52008-11-18 17:53:22 +00001850class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001851 def __init__(self, host=None, queue_entry=None):
1852 assert bool(host) ^ bool(queue_entry)
1853 if queue_entry:
1854 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001855 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001856 self.host = host
showard170873e2009-01-07 00:22:26 +00001857
1858 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001859 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1860 ['--cleanup'],
1861 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001862 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001863 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1864 failure_tasks=[repair_task])
1865
1866 self._set_ids(host=host, queue_entries=[queue_entry])
1867 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001868
mblighd5c95802008-03-05 00:33:46 +00001869
jadmanski0afbb632008-06-06 21:10:57 +00001870 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001871 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001872 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001873 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001874
mblighd5c95802008-03-05 00:33:46 +00001875
showard21baa452008-10-21 00:08:39 +00001876 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001877 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001878 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001879 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001880 self.host.update_field('dirty', 0)
1881
1882
showardd3dc1992009-04-22 21:01:40 +00001883class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001884 _num_running_parses = 0
1885
showardd3dc1992009-04-22 21:01:40 +00001886 def __init__(self, queue_entries, run_monitor=None):
1887 super(FinalReparseTask, self).__init__(queue_entries,
1888 pidfile_name=_PARSER_PID_FILE,
1889 logfile_name='.parse.log',
1890 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001891 # don't use _set_ids, since we don't want to set the host_ids
1892 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001893 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001894
showard97aed502008-11-04 02:01:24 +00001895
1896 @classmethod
1897 def _increment_running_parses(cls):
1898 cls._num_running_parses += 1
1899
1900
1901 @classmethod
1902 def _decrement_running_parses(cls):
1903 cls._num_running_parses -= 1
1904
1905
1906 @classmethod
1907 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001908 return (cls._num_running_parses <
1909 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001910
1911
1912 def prolog(self):
1913 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001914 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001915
1916
1917 def epilog(self):
1918 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001919 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001920
1921
showardd3dc1992009-04-22 21:01:40 +00001922 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001923 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001924 results_dir]
showard97aed502008-11-04 02:01:24 +00001925
1926
showard08a36412009-05-05 01:01:13 +00001927 def tick(self):
1928 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001929 # and we can, at which point we revert to default behavior
1930 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001931 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001932 else:
1933 self._try_starting_parse()
1934
1935
1936 def run(self):
1937 # override run() to not actually run unless we can
1938 self._try_starting_parse()
1939
1940
1941 def _try_starting_parse(self):
1942 if not self._can_run_new_parse():
1943 return
showard170873e2009-01-07 00:22:26 +00001944
showard97aed502008-11-04 02:01:24 +00001945 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001946 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001947
showard97aed502008-11-04 02:01:24 +00001948 self._increment_running_parses()
1949 self._parse_started = True
1950
1951
1952 def finished(self, success):
1953 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001954 if self._parse_started:
1955 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001956
1957
showardc9ae1782009-01-30 01:42:37 +00001958class SetEntryPendingTask(AgentTask):
1959 def __init__(self, queue_entry):
1960 super(SetEntryPendingTask, self).__init__(cmd='')
1961 self._queue_entry = queue_entry
1962 self._set_ids(queue_entries=[queue_entry])
1963
1964
1965 def run(self):
1966 agent = self._queue_entry.on_pending()
1967 if agent:
1968 self.agent.dispatcher.add_agent(agent)
1969 self.finished(True)
1970
1971
showarda3c58572009-03-12 20:36:59 +00001972class DBError(Exception):
1973 """Raised by the DBObject constructor when its select fails."""
1974
1975
mbligh36768f02008-02-22 18:28:33 +00001976class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001977 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001978
1979 # Subclasses MUST override these:
1980 _table_name = ''
1981 _fields = ()
1982
showarda3c58572009-03-12 20:36:59 +00001983 # A mapping from (type, id) to the instance of the object for that
1984 # particular id. This prevents us from creating new Job() and Host()
1985 # instances for every HostQueueEntry object that we instantiate as
1986 # multiple HQEs often share the same Job.
1987 _instances_by_type_and_id = weakref.WeakValueDictionary()
1988 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001989
showarda3c58572009-03-12 20:36:59 +00001990
1991 def __new__(cls, id=None, **kwargs):
1992 """
1993 Look to see if we already have an instance for this particular type
1994 and id. If so, use it instead of creating a duplicate instance.
1995 """
1996 if id is not None:
1997 instance = cls._instances_by_type_and_id.get((cls, id))
1998 if instance:
1999 return instance
2000 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2001
2002
2003 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002004 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002005 assert self._table_name, '_table_name must be defined in your class'
2006 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002007 if not new_record:
2008 if self._initialized and not always_query:
2009 return # We've already been initialized.
2010 if id is None:
2011 id = row[0]
2012 # Tell future constructors to use us instead of re-querying while
2013 # this instance is still around.
2014 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002015
showard6ae5ea92009-02-25 00:11:51 +00002016 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002017
jadmanski0afbb632008-06-06 21:10:57 +00002018 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002019
jadmanski0afbb632008-06-06 21:10:57 +00002020 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002021 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002022
showarda3c58572009-03-12 20:36:59 +00002023 if self._initialized:
2024 differences = self._compare_fields_in_row(row)
2025 if differences:
showard7629f142009-03-27 21:02:02 +00002026 logging.warn(
2027 'initialized %s %s instance requery is updating: %s',
2028 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002029 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002030 self._initialized = True
2031
2032
2033 @classmethod
2034 def _clear_instance_cache(cls):
2035 """Used for testing, clear the internal instance cache."""
2036 cls._instances_by_type_and_id.clear()
2037
2038
showardccbd6c52009-03-21 00:10:21 +00002039 def _fetch_row_from_db(self, row_id):
2040 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2041 rows = _db.execute(sql, (row_id,))
2042 if not rows:
showard76e29d12009-04-15 21:53:10 +00002043 raise DBError("row not found (table=%s, row id=%s)"
2044 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002045 return rows[0]
2046
2047
showarda3c58572009-03-12 20:36:59 +00002048 def _assert_row_length(self, row):
2049 assert len(row) == len(self._fields), (
2050 "table = %s, row = %s/%d, fields = %s/%d" % (
2051 self.__table, row, len(row), self._fields, len(self._fields)))
2052
2053
2054 def _compare_fields_in_row(self, row):
2055 """
2056 Given a row as returned by a SELECT query, compare it to our existing
2057 in memory fields.
2058
2059 @param row - A sequence of values corresponding to fields named in
2060 The class attribute _fields.
2061
2062 @returns A dictionary listing the differences keyed by field name
2063 containing tuples of (current_value, row_value).
2064 """
2065 self._assert_row_length(row)
2066 differences = {}
2067 for field, row_value in itertools.izip(self._fields, row):
2068 current_value = getattr(self, field)
2069 if current_value != row_value:
2070 differences[field] = (current_value, row_value)
2071 return differences
showard2bab8f42008-11-12 18:15:22 +00002072
2073
2074 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002075 """
2076 Update our field attributes using a single row returned by SELECT.
2077
2078 @param row - A sequence of values corresponding to fields named in
2079 the class fields list.
2080 """
2081 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002082
showard2bab8f42008-11-12 18:15:22 +00002083 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002084 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002085 setattr(self, field, value)
2086 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002087
showard2bab8f42008-11-12 18:15:22 +00002088 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002089
mblighe2586682008-02-29 22:45:46 +00002090
showardccbd6c52009-03-21 00:10:21 +00002091 def update_from_database(self):
2092 assert self.id is not None
2093 row = self._fetch_row_from_db(self.id)
2094 self._update_fields_from_row(row)
2095
2096
jadmanski0afbb632008-06-06 21:10:57 +00002097 def count(self, where, table = None):
2098 if not table:
2099 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002100
jadmanski0afbb632008-06-06 21:10:57 +00002101 rows = _db.execute("""
2102 SELECT count(*) FROM %s
2103 WHERE %s
2104 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002105
jadmanski0afbb632008-06-06 21:10:57 +00002106 assert len(rows) == 1
2107
2108 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002109
2110
showardd3dc1992009-04-22 21:01:40 +00002111 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002112 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002113
showard2bab8f42008-11-12 18:15:22 +00002114 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002115 return
mbligh36768f02008-02-22 18:28:33 +00002116
mblighf8c624d2008-07-03 16:58:45 +00002117 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002118 _db.execute(query, (value, self.id))
2119
showard2bab8f42008-11-12 18:15:22 +00002120 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002121
2122
jadmanski0afbb632008-06-06 21:10:57 +00002123 def save(self):
2124 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002125 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002126 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002127 values = []
2128 for key in keys:
2129 value = getattr(self, key)
2130 if value is None:
2131 values.append('NULL')
2132 else:
2133 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002134 values_str = ','.join(values)
2135 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2136 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002137 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002138 # Update our id to the one the database just assigned to us.
2139 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002140
2141
jadmanski0afbb632008-06-06 21:10:57 +00002142 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002143 self._instances_by_type_and_id.pop((type(self), id), None)
2144 self._initialized = False
2145 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002146 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2147 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002148
2149
showard63a34772008-08-18 19:32:50 +00002150 @staticmethod
2151 def _prefix_with(string, prefix):
2152 if string:
2153 string = prefix + string
2154 return string
2155
2156
jadmanski0afbb632008-06-06 21:10:57 +00002157 @classmethod
showard989f25d2008-10-01 11:38:11 +00002158 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002159 """
2160 Construct instances of our class based on the given database query.
2161
2162 @yields One class instance for each row fetched.
2163 """
showard63a34772008-08-18 19:32:50 +00002164 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2165 where = cls._prefix_with(where, 'WHERE ')
2166 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002167 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002168 'joins' : joins,
2169 'where' : where,
2170 'order_by' : order_by})
2171 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002172 for row in rows:
2173 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002174
mbligh36768f02008-02-22 18:28:33 +00002175
2176class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002177 _table_name = 'ineligible_host_queues'
2178 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002179
2180
showard89f84db2009-03-12 20:39:13 +00002181class AtomicGroup(DBObject):
2182 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002183 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2184 'invalid')
showard89f84db2009-03-12 20:39:13 +00002185
2186
showard989f25d2008-10-01 11:38:11 +00002187class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002188 _table_name = 'labels'
2189 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002190 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002191
2192
mbligh36768f02008-02-22 18:28:33 +00002193class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002194 _table_name = 'hosts'
2195 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2196 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2197
2198
jadmanski0afbb632008-06-06 21:10:57 +00002199 def current_task(self):
2200 rows = _db.execute("""
2201 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2202 """, (self.id,))
2203
2204 if len(rows) == 0:
2205 return None
2206 else:
2207 assert len(rows) == 1
2208 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002209 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002210
2211
jadmanski0afbb632008-06-06 21:10:57 +00002212 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002213 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002214 if self.current_task():
2215 self.current_task().requeue()
2216
showard6ae5ea92009-02-25 00:11:51 +00002217
jadmanski0afbb632008-06-06 21:10:57 +00002218 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002219 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002220 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002221
2222
showard170873e2009-01-07 00:22:26 +00002223 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002224 """
showard170873e2009-01-07 00:22:26 +00002225 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002226 """
2227 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002228 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002229 FROM labels
2230 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002231 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002232 ORDER BY labels.name
2233 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002234 platform = None
2235 all_labels = []
2236 for label_name, is_platform in rows:
2237 if is_platform:
2238 platform = label_name
2239 all_labels.append(label_name)
2240 return platform, all_labels
2241
2242
2243 def reverify_tasks(self):
2244 cleanup_task = CleanupTask(host=self)
2245 verify_task = VerifyTask(host=self)
2246 # just to make sure this host does not get taken away
2247 self.set_status('Cleaning')
2248 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002249
2250
showard54c1ea92009-05-20 00:32:58 +00002251 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2252
2253
2254 @classmethod
2255 def cmp_for_sort(cls, a, b):
2256 """
2257 A comparison function for sorting Host objects by hostname.
2258
2259 This strips any trailing numeric digits, ignores leading 0s and
2260 compares hostnames by the leading name and the trailing digits as a
2261 number. If both hostnames do not match this pattern, they are simply
2262 compared as lower case strings.
2263
2264 Example of how hostnames will be sorted:
2265
2266 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2267
2268 This hopefully satisfy most people's hostname sorting needs regardless
2269 of their exact naming schemes. Nobody sane should have both a host10
2270 and host010 (but the algorithm works regardless).
2271 """
2272 lower_a = a.hostname.lower()
2273 lower_b = b.hostname.lower()
2274 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2275 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2276 if match_a and match_b:
2277 name_a, number_a_str = match_a.groups()
2278 name_b, number_b_str = match_b.groups()
2279 number_a = int(number_a_str.lstrip('0'))
2280 number_b = int(number_b_str.lstrip('0'))
2281 result = cmp((name_a, number_a), (name_b, number_b))
2282 if result == 0 and lower_a != lower_b:
2283 # If they compared equal above but the lower case names are
2284 # indeed different, don't report equality. abc012 != abc12.
2285 return cmp(lower_a, lower_b)
2286 return result
2287 else:
2288 return cmp(lower_a, lower_b)
2289
2290
mbligh36768f02008-02-22 18:28:33 +00002291class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002292 _table_name = 'host_queue_entries'
2293 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002294 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002295 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002296
2297
showarda3c58572009-03-12 20:36:59 +00002298 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002299 assert id or row
showarda3c58572009-03-12 20:36:59 +00002300 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002301 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002302
jadmanski0afbb632008-06-06 21:10:57 +00002303 if self.host_id:
2304 self.host = Host(self.host_id)
2305 else:
2306 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002307
showard170873e2009-01-07 00:22:26 +00002308 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002309 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002310
2311
showard89f84db2009-03-12 20:39:13 +00002312 @classmethod
2313 def clone(cls, template):
2314 """
2315 Creates a new row using the values from a template instance.
2316
2317 The new instance will not exist in the database or have a valid
2318 id attribute until its save() method is called.
2319 """
2320 assert isinstance(template, cls)
2321 new_row = [getattr(template, field) for field in cls._fields]
2322 clone = cls(row=new_row, new_record=True)
2323 clone.id = None
2324 return clone
2325
2326
showardc85c21b2008-11-24 22:17:37 +00002327 def _view_job_url(self):
2328 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2329
2330
showardf1ae3542009-05-11 19:26:02 +00002331 def get_labels(self):
2332 """
2333 Get all labels associated with this host queue entry (either via the
2334 meta_host or as a job dependency label). The labels yielded are not
2335 guaranteed to be unique.
2336
2337 @yields Label instances associated with this host_queue_entry.
2338 """
2339 if self.meta_host:
2340 yield Label(id=self.meta_host, always_query=False)
2341 labels = Label.fetch(
2342 joins="JOIN jobs_dependency_labels AS deps "
2343 "ON (labels.id = deps.label_id)",
2344 where="deps.job_id = %d" % self.job.id)
2345 for label in labels:
2346 yield label
2347
2348
jadmanski0afbb632008-06-06 21:10:57 +00002349 def set_host(self, host):
2350 if host:
2351 self.queue_log_record('Assigning host ' + host.hostname)
2352 self.update_field('host_id', host.id)
2353 self.update_field('active', True)
2354 self.block_host(host.id)
2355 else:
2356 self.queue_log_record('Releasing host')
2357 self.unblock_host(self.host.id)
2358 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002359
jadmanski0afbb632008-06-06 21:10:57 +00002360 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002361
2362
jadmanski0afbb632008-06-06 21:10:57 +00002363 def get_host(self):
2364 return self.host
mbligh36768f02008-02-22 18:28:33 +00002365
2366
jadmanski0afbb632008-06-06 21:10:57 +00002367 def queue_log_record(self, log_line):
2368 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002369 _drone_manager.write_lines_to_file(self.queue_log_path,
2370 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002371
2372
jadmanski0afbb632008-06-06 21:10:57 +00002373 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002374 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002375 row = [0, self.job.id, host_id]
2376 block = IneligibleHostQueue(row=row, new_record=True)
2377 block.save()
mblighe2586682008-02-29 22:45:46 +00002378
2379
jadmanski0afbb632008-06-06 21:10:57 +00002380 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002381 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002382 blocks = IneligibleHostQueue.fetch(
2383 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2384 for block in blocks:
2385 block.delete()
mblighe2586682008-02-29 22:45:46 +00002386
2387
showard2bab8f42008-11-12 18:15:22 +00002388 def set_execution_subdir(self, subdir=None):
2389 if subdir is None:
2390 assert self.get_host()
2391 subdir = self.get_host().hostname
2392 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002393
2394
showard6355f6b2008-12-05 18:52:13 +00002395 def _get_hostname(self):
2396 if self.host:
2397 return self.host.hostname
2398 return 'no host'
2399
2400
showard170873e2009-01-07 00:22:26 +00002401 def __str__(self):
2402 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2403
2404
jadmanski0afbb632008-06-06 21:10:57 +00002405 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002406 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002407
showardb18134f2009-03-20 20:52:18 +00002408 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002409
showardc85c21b2008-11-24 22:17:37 +00002410 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002411 self.update_field('complete', False)
2412 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002413
jadmanski0afbb632008-06-06 21:10:57 +00002414 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002415 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002416 self.update_field('complete', False)
2417 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002418
showardc85c21b2008-11-24 22:17:37 +00002419 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002420 self.update_field('complete', True)
2421 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002422
2423 should_email_status = (status.lower() in _notify_email_statuses or
2424 'all' in _notify_email_statuses)
2425 if should_email_status:
2426 self._email_on_status(status)
2427
2428 self._email_on_job_complete()
2429
2430
2431 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002432 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002433
2434 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2435 self.job.id, self.job.name, hostname, status)
2436 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2437 self.job.id, self.job.name, hostname, status,
2438 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002439 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002440
2441
2442 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002443 if not self.job.is_finished():
2444 return
showard542e8402008-09-19 20:16:18 +00002445
showardc85c21b2008-11-24 22:17:37 +00002446 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002447 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002448 for queue_entry in hosts_queue:
2449 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002450 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002451 queue_entry.status))
2452
2453 summary_text = "\n".join(summary_text)
2454 status_counts = models.Job.objects.get_status_counts(
2455 [self.job.id])[self.job.id]
2456 status = ', '.join('%d %s' % (count, status) for status, count
2457 in status_counts.iteritems())
2458
2459 subject = 'Autotest: Job ID: %s "%s" %s' % (
2460 self.job.id, self.job.name, status)
2461 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2462 self.job.id, self.job.name, status, self._view_job_url(),
2463 summary_text)
showard170873e2009-01-07 00:22:26 +00002464 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002465
2466
showard89f84db2009-03-12 20:39:13 +00002467 def run(self, assigned_host=None):
2468 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002469 assert assigned_host
2470 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002471 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002472
showardb18134f2009-03-20 20:52:18 +00002473 logging.info("%s/%s/%s scheduled on %s, status=%s",
2474 self.job.name, self.meta_host, self.atomic_group_id,
2475 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002476
jadmanski0afbb632008-06-06 21:10:57 +00002477 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002478
showard6ae5ea92009-02-25 00:11:51 +00002479
jadmanski0afbb632008-06-06 21:10:57 +00002480 def requeue(self):
2481 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002482 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002483 # verify/cleanup failure sets the execution subdir, so reset it here
2484 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002485 if self.meta_host:
2486 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002487
2488
jadmanski0afbb632008-06-06 21:10:57 +00002489 def handle_host_failure(self):
2490 """\
2491 Called when this queue entry's host has failed verification and
2492 repair.
2493 """
2494 assert not self.meta_host
2495 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002496 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002497
2498
jadmanskif7fa2cc2008-10-01 14:13:23 +00002499 @property
2500 def aborted_by(self):
2501 self._load_abort_info()
2502 return self._aborted_by
2503
2504
2505 @property
2506 def aborted_on(self):
2507 self._load_abort_info()
2508 return self._aborted_on
2509
2510
2511 def _load_abort_info(self):
2512 """ Fetch info about who aborted the job. """
2513 if hasattr(self, "_aborted_by"):
2514 return
2515 rows = _db.execute("""
2516 SELECT users.login, aborted_host_queue_entries.aborted_on
2517 FROM aborted_host_queue_entries
2518 INNER JOIN users
2519 ON users.id = aborted_host_queue_entries.aborted_by_id
2520 WHERE aborted_host_queue_entries.queue_entry_id = %s
2521 """, (self.id,))
2522 if rows:
2523 self._aborted_by, self._aborted_on = rows[0]
2524 else:
2525 self._aborted_by = self._aborted_on = None
2526
2527
showardb2e2c322008-10-14 17:33:55 +00002528 def on_pending(self):
2529 """
2530 Called when an entry in a synchronous job has passed verify. If the
2531 job is ready to run, returns an agent to run the job. Returns None
2532 otherwise.
2533 """
2534 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002535 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002536 if self.job.is_ready():
2537 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002538 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002539 return None
2540
2541
showardd3dc1992009-04-22 21:01:40 +00002542 def abort(self, dispatcher):
2543 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002544
showardd3dc1992009-04-22 21:01:40 +00002545 Status = models.HostQueueEntry.Status
2546 has_running_job_agent = (
2547 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2548 and dispatcher.get_agents_for_entry(self))
2549 if has_running_job_agent:
2550 # do nothing; post-job tasks will finish and then mark this entry
2551 # with status "Aborted" and take care of the host
2552 return
2553
2554 if self.status in (Status.STARTING, Status.PENDING):
2555 self.host.set_status(models.Host.Status.READY)
2556 elif self.status == Status.VERIFYING:
2557 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2558
2559 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002560
2561 def execution_tag(self):
2562 assert self.execution_subdir
2563 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002564
2565
mbligh36768f02008-02-22 18:28:33 +00002566class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002567 _table_name = 'jobs'
2568 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2569 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002570 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002571 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002572
2573
showarda3c58572009-03-12 20:36:59 +00002574 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002575 assert id or row
showarda3c58572009-03-12 20:36:59 +00002576 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002577
mblighe2586682008-02-29 22:45:46 +00002578
jadmanski0afbb632008-06-06 21:10:57 +00002579 def is_server_job(self):
2580 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002581
2582
showard170873e2009-01-07 00:22:26 +00002583 def tag(self):
2584 return "%s-%s" % (self.id, self.owner)
2585
2586
jadmanski0afbb632008-06-06 21:10:57 +00002587 def get_host_queue_entries(self):
2588 rows = _db.execute("""
2589 SELECT * FROM host_queue_entries
2590 WHERE job_id= %s
2591 """, (self.id,))
2592 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002593
jadmanski0afbb632008-06-06 21:10:57 +00002594 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002595
jadmanski0afbb632008-06-06 21:10:57 +00002596 return entries
mbligh36768f02008-02-22 18:28:33 +00002597
2598
jadmanski0afbb632008-06-06 21:10:57 +00002599 def set_status(self, status, update_queues=False):
2600 self.update_field('status',status)
2601
2602 if update_queues:
2603 for queue_entry in self.get_host_queue_entries():
2604 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002605
2606
jadmanski0afbb632008-06-06 21:10:57 +00002607 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002608 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2609 status='Pending')
2610 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002611
2612
jadmanski0afbb632008-06-06 21:10:57 +00002613 def num_machines(self, clause = None):
2614 sql = "job_id=%s" % self.id
2615 if clause:
2616 sql += " AND (%s)" % clause
2617 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002618
2619
jadmanski0afbb632008-06-06 21:10:57 +00002620 def num_queued(self):
2621 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002622
2623
jadmanski0afbb632008-06-06 21:10:57 +00002624 def num_active(self):
2625 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002626
2627
jadmanski0afbb632008-06-06 21:10:57 +00002628 def num_complete(self):
2629 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002630
2631
jadmanski0afbb632008-06-06 21:10:57 +00002632 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002633 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002634
mbligh36768f02008-02-22 18:28:33 +00002635
showard6bb7c292009-01-30 01:44:51 +00002636 def _not_yet_run_entries(self, include_verifying=True):
2637 statuses = [models.HostQueueEntry.Status.QUEUED,
2638 models.HostQueueEntry.Status.PENDING]
2639 if include_verifying:
2640 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2641 return models.HostQueueEntry.objects.filter(job=self.id,
2642 status__in=statuses)
2643
2644
2645 def _stop_all_entries(self):
2646 entries_to_stop = self._not_yet_run_entries(
2647 include_verifying=False)
2648 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002649 assert not child_entry.complete, (
2650 '%s status=%s, active=%s, complete=%s' %
2651 (child_entry.id, child_entry.status, child_entry.active,
2652 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002653 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2654 child_entry.host.status = models.Host.Status.READY
2655 child_entry.host.save()
2656 child_entry.status = models.HostQueueEntry.Status.STOPPED
2657 child_entry.save()
2658
showard2bab8f42008-11-12 18:15:22 +00002659 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002660 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002661 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002662 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002663
2664
jadmanski0afbb632008-06-06 21:10:57 +00002665 def write_to_machines_file(self, queue_entry):
2666 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002667 file_path = os.path.join(self.tag(), '.machines')
2668 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002669
2670
showardf1ae3542009-05-11 19:26:02 +00002671 def _next_group_name(self, group_name=''):
2672 """@returns a directory name to use for the next host group results."""
2673 if group_name:
2674 # Sanitize for use as a pathname.
2675 group_name = group_name.replace(os.path.sep, '_')
2676 if group_name.startswith('.'):
2677 group_name = '_' + group_name[1:]
2678 # Add a separator between the group name and 'group%d'.
2679 group_name += '.'
2680 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002681 query = models.HostQueueEntry.objects.filter(
2682 job=self.id).values('execution_subdir').distinct()
2683 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002684 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2685 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002686 if ids:
2687 next_id = max(ids) + 1
2688 else:
2689 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002690 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002691
2692
showard170873e2009-01-07 00:22:26 +00002693 def _write_control_file(self, execution_tag):
2694 control_path = _drone_manager.attach_file_to_execution(
2695 execution_tag, self.control_file)
2696 return control_path
mbligh36768f02008-02-22 18:28:33 +00002697
showardb2e2c322008-10-14 17:33:55 +00002698
showard2bab8f42008-11-12 18:15:22 +00002699 def get_group_entries(self, queue_entry_from_group):
2700 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002701 return list(HostQueueEntry.fetch(
2702 where='job_id=%s AND execution_subdir=%s',
2703 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002704
2705
showardb2e2c322008-10-14 17:33:55 +00002706 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002707 assert queue_entries
2708 execution_tag = queue_entries[0].execution_tag()
2709 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002710 hostnames = ','.join([entry.get_host().hostname
2711 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002712
showard87ba02a2009-04-20 19:37:32 +00002713 params = _autoserv_command_line(
2714 hostnames, execution_tag,
2715 ['-P', execution_tag, '-n',
2716 _drone_manager.absolute_path(control_path)],
2717 job=self)
mbligh36768f02008-02-22 18:28:33 +00002718
jadmanski0afbb632008-06-06 21:10:57 +00002719 if not self.is_server_job():
2720 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002721
showardb2e2c322008-10-14 17:33:55 +00002722 return params
mblighe2586682008-02-29 22:45:46 +00002723
mbligh36768f02008-02-22 18:28:33 +00002724
showardc9ae1782009-01-30 01:42:37 +00002725 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002726 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002727 return True
showard0fc38302008-10-23 00:44:07 +00002728 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002729 return queue_entry.get_host().dirty
2730 return False
showard21baa452008-10-21 00:08:39 +00002731
showardc9ae1782009-01-30 01:42:37 +00002732
2733 def _should_run_verify(self, queue_entry):
2734 do_not_verify = (queue_entry.host.protection ==
2735 host_protections.Protection.DO_NOT_VERIFY)
2736 if do_not_verify:
2737 return False
2738 return self.run_verify
2739
2740
2741 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002742 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002743 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002744 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002745 if self._should_run_verify(queue_entry):
2746 tasks.append(VerifyTask(queue_entry=queue_entry))
2747 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002748 return tasks
2749
2750
showardf1ae3542009-05-11 19:26:02 +00002751 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002752 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002753 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002754 else:
showardf1ae3542009-05-11 19:26:02 +00002755 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002756 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002757 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002758 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002759
2760 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002761 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002762
2763
2764 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002765 """
2766 @returns A tuple containing a list of HostQueueEntry instances to be
2767 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002768 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002769 """
2770 if include_queue_entry.atomic_group_id:
2771 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2772 always_query=False)
2773 else:
2774 atomic_group = None
2775
showard2bab8f42008-11-12 18:15:22 +00002776 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002777 if atomic_group:
2778 num_entries_wanted = atomic_group.max_number_of_machines
2779 else:
2780 num_entries_wanted = self.synch_count
2781 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002782
showardf1ae3542009-05-11 19:26:02 +00002783 if num_entries_wanted > 0:
2784 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002785 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002786 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002787 params=(self.id, include_queue_entry.id)))
2788
2789 # Sort the chosen hosts by hostname before slicing.
2790 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2791 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2792 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2793 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002794
showardf1ae3542009-05-11 19:26:02 +00002795 # Sanity check. We'll only ever be called if this can be met.
2796 assert len(chosen_entries) >= self.synch_count
2797
2798 if atomic_group:
2799 # Look at any meta_host and dependency labels and pick the first
2800 # one that also specifies this atomic group. Use that label name
2801 # as the group name if possible (it is more specific).
2802 group_name = atomic_group.name
2803 for label in include_queue_entry.get_labels():
2804 if label.atomic_group_id:
2805 assert label.atomic_group_id == atomic_group.id
2806 group_name = label.name
2807 break
2808 else:
2809 group_name = ''
2810
2811 self._assign_new_group(chosen_entries, group_name=group_name)
2812 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002813
2814
2815 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002816 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002817 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2818 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002819
showardf1ae3542009-05-11 19:26:02 +00002820 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2821 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002822
2823
showardf1ae3542009-05-11 19:26:02 +00002824 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002825 for queue_entry in queue_entries:
2826 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002827 params = self._get_autoserv_params(queue_entries)
2828 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002829 cmd=params, group_name=group_name)
2830 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002831 entry_ids = [entry.id for entry in queue_entries]
2832
showard170873e2009-01-07 00:22:26 +00002833 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002834
2835
mbligh36768f02008-02-22 18:28:33 +00002836if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002837 main()