blob: 64dcdc5e4cc6de9e29c3753f45266d1694e3e832 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard29f7cd22009-04-29 21:16:24 +000017from autotest_lib.frontend.afe import models, rpc_utils
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
mbligh83c1e9e2009-05-01 23:10:41 +000078def _site_init_monitor_db_dummy():
79 return {}
80
81
mbligh36768f02008-02-22 18:28:33 +000082def main():
showard27f33872009-04-07 18:20:53 +000083 try:
84 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000085 except SystemExit:
86 raise
showard27f33872009-04-07 18:20:53 +000087 except:
88 logging.exception('Exception escaping in monitor_db')
89 raise
90
91
92def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000093 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000094
jadmanski0afbb632008-06-06 21:10:57 +000095 parser = optparse.OptionParser(usage)
96 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
97 action='store_true')
98 parser.add_option('--logfile', help='Set a log file that all stdout ' +
99 'should be redirected to. Stderr will go to this ' +
100 'file + ".err"')
101 parser.add_option('--test', help='Indicate that scheduler is under ' +
102 'test and should use dummy autoserv and no parsing',
103 action='store_true')
104 (options, args) = parser.parse_args()
105 if len(args) != 1:
106 parser.print_usage()
107 return
mbligh36768f02008-02-22 18:28:33 +0000108
jadmanski0afbb632008-06-06 21:10:57 +0000109 global RESULTS_DIR
110 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000111
mbligh83c1e9e2009-05-01 23:10:41 +0000112 site_init = utils.import_site_function(__file__,
113 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
114 _site_init_monitor_db_dummy)
115 site_init()
116
showardcca334f2009-03-12 20:38:34 +0000117 # Change the cwd while running to avoid issues incase we were launched from
118 # somewhere odd (such as a random NFS home directory of the person running
119 # sudo to launch us as the appropriate user).
120 os.chdir(RESULTS_DIR)
121
jadmanski0afbb632008-06-06 21:10:57 +0000122 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000123 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
124 "notify_email_statuses",
125 default='')
showardc85c21b2008-11-24 22:17:37 +0000126 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000127 _notify_email_statuses = [status for status in
128 re.split(r'[\s,;:]', notify_statuses_list.lower())
129 if status]
showardc85c21b2008-11-24 22:17:37 +0000130
jadmanski0afbb632008-06-06 21:10:57 +0000131 if options.test:
132 global _autoserv_path
133 _autoserv_path = 'autoserv_dummy'
134 global _testing_mode
135 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000136
mbligh37eceaa2008-12-15 22:56:37 +0000137 # AUTOTEST_WEB.base_url is still a supported config option as some people
138 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000139 global _base_url
showard170873e2009-01-07 00:22:26 +0000140 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
141 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000142 if config_base_url:
143 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000144 else:
mbligh37eceaa2008-12-15 22:56:37 +0000145 # For the common case of everything running on a single server you
146 # can just set the hostname in a single place in the config file.
147 server_name = c.get_config_value('SERVER', 'hostname')
148 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000149 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000150 sys.exit(1)
151 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000152
showardc5afc462009-01-13 00:09:39 +0000153 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000154 server.start()
155
jadmanski0afbb632008-06-06 21:10:57 +0000156 try:
showardc5afc462009-01-13 00:09:39 +0000157 init(options.logfile)
158 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000159 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000160
jadmanski0afbb632008-06-06 21:10:57 +0000161 while not _shutdown:
162 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000163 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000164 except:
showard170873e2009-01-07 00:22:26 +0000165 email_manager.manager.log_stacktrace(
166 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000167
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000169 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000170 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000171 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000172
173
174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
180def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000181 if logfile:
182 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
184 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000185
mblighfb676032009-04-01 18:25:38 +0000186 utils.write_pid("monitor_db")
187
showardb1e51872008-10-07 11:08:18 +0000188 if _testing_mode:
189 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000190 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000191
jadmanski0afbb632008-06-06 21:10:57 +0000192 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
193 global _db
showard170873e2009-01-07 00:22:26 +0000194 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000195 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000196
showardfa8629c2008-11-04 16:51:23 +0000197 # ensure Django connection is in autocommit
198 setup_django_environment.enable_autocommit()
199
showardb18134f2009-03-20 20:52:18 +0000200 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000201 signal.signal(signal.SIGINT, handle_sigint)
202
showardd1ee1dd2009-01-07 21:33:08 +0000203 drones = global_config.global_config.get_config_value(
204 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
205 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000206 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000207 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000208 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000211
212
213def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000214 out_file = logfile
215 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000216 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000217 out_fd = open(out_file, "a", buffering=0)
218 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000219
jadmanski0afbb632008-06-06 21:10:57 +0000220 os.dup2(out_fd.fileno(), sys.stdout.fileno())
221 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000222
jadmanski0afbb632008-06-06 21:10:57 +0000223 sys.stdout = out_fd
224 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000225
226
showard87ba02a2009-04-20 19:37:32 +0000227def _autoserv_command_line(machines, results_dir, extra_args, job=None,
228 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
234 @param results_dir - string - Where the results will be written (-r).
235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showard87ba02a2009-04-20 19:37:32 +0000241 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
242 '-r', _drone_manager.absolute_path(results_dir)]
243 if job or queue_entry:
244 if not job:
245 job = queue_entry.job
246 autoserv_argv += ['-u', job.owner, '-l', job.name]
247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
showard63a34772008-08-18 19:32:50 +0000254class HostScheduler(object):
255 def _get_ready_hosts(self):
256 # avoid any host with a currently active queue entry against it
257 hosts = Host.fetch(
258 joins='LEFT JOIN host_queue_entries AS active_hqe '
259 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000260 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000261 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000262 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000263 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
264 return dict((host.id, host) for host in hosts)
265
266
267 @staticmethod
268 def _get_sql_id_list(id_list):
269 return ','.join(str(item_id) for item_id in id_list)
270
271
272 @classmethod
showard989f25d2008-10-01 11:38:11 +0000273 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000274 if not id_list:
275 return {}
showard63a34772008-08-18 19:32:50 +0000276 query %= cls._get_sql_id_list(id_list)
277 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000278 return cls._process_many2many_dict(rows, flip)
279
280
281 @staticmethod
282 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000283 result = {}
284 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000285 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000286 if flip:
287 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000288 result.setdefault(left_id, set()).add(right_id)
289 return result
290
291
292 @classmethod
293 def _get_job_acl_groups(cls, job_ids):
294 query = """
showardd9ac4452009-02-07 02:04:37 +0000295 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000296 FROM jobs
297 INNER JOIN users ON users.login = jobs.owner
298 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
299 WHERE jobs.id IN (%s)
300 """
301 return cls._get_many2many_dict(query, job_ids)
302
303
304 @classmethod
305 def _get_job_ineligible_hosts(cls, job_ids):
306 query = """
307 SELECT job_id, host_id
308 FROM ineligible_host_queues
309 WHERE job_id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
showard989f25d2008-10-01 11:38:11 +0000315 def _get_job_dependencies(cls, job_ids):
316 query = """
317 SELECT job_id, label_id
318 FROM jobs_dependency_labels
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard63a34772008-08-18 19:32:50 +0000325 def _get_host_acls(cls, host_ids):
326 query = """
showardd9ac4452009-02-07 02:04:37 +0000327 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000328 FROM acl_groups_hosts
329 WHERE host_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, host_ids)
332
333
334 @classmethod
335 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000336 if not host_ids:
337 return {}, {}
showard63a34772008-08-18 19:32:50 +0000338 query = """
339 SELECT label_id, host_id
340 FROM hosts_labels
341 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000342 """ % cls._get_sql_id_list(host_ids)
343 rows = _db.execute(query)
344 labels_to_hosts = cls._process_many2many_dict(rows)
345 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
346 return labels_to_hosts, hosts_to_labels
347
348
349 @classmethod
350 def _get_labels(cls):
351 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000352
353
354 def refresh(self, pending_queue_entries):
355 self._hosts_available = self._get_ready_hosts()
356
357 relevant_jobs = [queue_entry.job_id
358 for queue_entry in pending_queue_entries]
359 self._job_acls = self._get_job_acl_groups(relevant_jobs)
360 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000361 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000362
363 host_ids = self._hosts_available.keys()
364 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000365 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
366
367 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000368
369
370 def _is_acl_accessible(self, host_id, queue_entry):
371 job_acls = self._job_acls.get(queue_entry.job_id, set())
372 host_acls = self._host_acls.get(host_id, set())
373 return len(host_acls.intersection(job_acls)) > 0
374
375
showard989f25d2008-10-01 11:38:11 +0000376 def _check_job_dependencies(self, job_dependencies, host_labels):
377 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000378 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000379
380
381 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
382 queue_entry):
showardade14e22009-01-26 22:38:32 +0000383 if not queue_entry.meta_host:
384 # bypass only_if_needed labels when a specific host is selected
385 return True
386
showard989f25d2008-10-01 11:38:11 +0000387 for label_id in host_labels:
388 label = self._labels[label_id]
389 if not label.only_if_needed:
390 # we don't care about non-only_if_needed labels
391 continue
392 if queue_entry.meta_host == label_id:
393 # if the label was requested in a metahost it's OK
394 continue
395 if label_id not in job_dependencies:
396 return False
397 return True
398
399
showard89f84db2009-03-12 20:39:13 +0000400 def _check_atomic_group_labels(self, host_labels, queue_entry):
401 """
402 Determine if the given HostQueueEntry's atomic group settings are okay
403 to schedule on a host with the given labels.
404
405 @param host_labels - A list of label ids that the host has.
406 @param queue_entry - The HostQueueEntry being considered for the host.
407
408 @returns True if atomic group settings are okay, False otherwise.
409 """
410 return (self._get_host_atomic_group_id(host_labels) ==
411 queue_entry.atomic_group_id)
412
413
414 def _get_host_atomic_group_id(self, host_labels):
415 """
416 Return the atomic group label id for a host with the given set of
417 labels if any, or None otherwise. Raises an exception if more than
418 one atomic group are found in the set of labels.
419
420 @param host_labels - A list of label ids that the host has.
421
422 @returns The id of the atomic group found on a label in host_labels
423 or None if no atomic group label is found.
424 @raises SchedulerError - If more than one atomic group label is found.
425 """
426 atomic_ids = [self._labels[label_id].atomic_group_id
427 for label_id in host_labels
428 if self._labels[label_id].atomic_group_id is not None]
429 if not atomic_ids:
430 return None
431 if len(atomic_ids) > 1:
432 raise SchedulerError('More than one atomic label on host.')
433 return atomic_ids[0]
434
435
436 def _get_atomic_group_labels(self, atomic_group_id):
437 """
438 Lookup the label ids that an atomic_group is associated with.
439
440 @param atomic_group_id - The id of the AtomicGroup to look up.
441
442 @returns A generator yeilding Label ids for this atomic group.
443 """
444 return (id for id, label in self._labels.iteritems()
445 if label.atomic_group_id == atomic_group_id
446 and not label.invalid)
447
448
showard54c1ea92009-05-20 00:32:58 +0000449 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000450 """
451 @param group_hosts - A sequence of Host ids to test for usability
452 and eligibility against the Job associated with queue_entry.
453 @param queue_entry - The HostQueueEntry that these hosts are being
454 tested for eligibility against.
455
456 @returns A subset of group_hosts Host ids that are eligible for the
457 supplied queue_entry.
458 """
459 return set(host_id for host_id in group_hosts
460 if self._is_host_usable(host_id)
461 and self._is_host_eligible_for_job(host_id, queue_entry))
462
463
showard989f25d2008-10-01 11:38:11 +0000464 def _is_host_eligible_for_job(self, host_id, queue_entry):
465 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
466 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000467
showard89f84db2009-03-12 20:39:13 +0000468 return (self._is_acl_accessible(host_id, queue_entry) and
469 self._check_job_dependencies(job_dependencies, host_labels) and
470 self._check_only_if_needed_labels(
471 job_dependencies, host_labels, queue_entry) and
472 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000473
474
showard63a34772008-08-18 19:32:50 +0000475 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000476 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000477 return None
478 return self._hosts_available.pop(queue_entry.host_id, None)
479
480
481 def _is_host_usable(self, host_id):
482 if host_id not in self._hosts_available:
483 # host was already used during this scheduling cycle
484 return False
485 if self._hosts_available[host_id].invalid:
486 # Invalid hosts cannot be used for metahosts. They're included in
487 # the original query because they can be used by non-metahosts.
488 return False
489 return True
490
491
492 def _schedule_metahost(self, queue_entry):
493 label_id = queue_entry.meta_host
494 hosts_in_label = self._label_hosts.get(label_id, set())
495 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
496 set())
497
498 # must iterate over a copy so we can mutate the original while iterating
499 for host_id in list(hosts_in_label):
500 if not self._is_host_usable(host_id):
501 hosts_in_label.remove(host_id)
502 continue
503 if host_id in ineligible_host_ids:
504 continue
showard989f25d2008-10-01 11:38:11 +0000505 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000506 continue
507
showard89f84db2009-03-12 20:39:13 +0000508 # Remove the host from our cached internal state before returning
509 # the host object.
showard63a34772008-08-18 19:32:50 +0000510 hosts_in_label.remove(host_id)
511 return self._hosts_available.pop(host_id)
512 return None
513
514
515 def find_eligible_host(self, queue_entry):
516 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000517 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000518 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_metahost(queue_entry)
521
522
showard89f84db2009-03-12 20:39:13 +0000523 def find_eligible_atomic_group(self, queue_entry):
524 """
525 Given an atomic group host queue entry, locate an appropriate group
526 of hosts for the associated job to run on.
527
528 The caller is responsible for creating new HQEs for the additional
529 hosts returned in order to run the actual job on them.
530
531 @returns A list of Host instances in a ready state to satisfy this
532 atomic group scheduling. Hosts will all belong to the same
533 atomic group label as specified by the queue_entry.
534 An empty list will be returned if no suitable atomic
535 group could be found.
536
537 TODO(gps): what is responsible for kicking off any attempted repairs on
538 a group of hosts? not this function, but something needs to. We do
539 not communicate that reason for returning [] outside of here...
540 For now, we'll just be unschedulable if enough hosts within one group
541 enter Repair Failed state.
542 """
543 assert queue_entry.atomic_group_id is not None
544 job = queue_entry.job
545 assert job.synch_count and job.synch_count > 0
546 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
547 if job.synch_count > atomic_group.max_number_of_machines:
548 # Such a Job and HostQueueEntry should never be possible to
549 # create using the frontend. Regardless, we can't process it.
550 # Abort it immediately and log an error on the scheduler.
551 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000552 logging.error(
553 'Error: job %d synch_count=%d > requested atomic_group %d '
554 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
555 job.id, job.synch_count, atomic_group.id,
556 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000557 return []
558 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
559 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
560 set())
561
562 # Look in each label associated with atomic_group until we find one with
563 # enough hosts to satisfy the job.
564 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
565 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
566 if queue_entry.meta_host is not None:
567 # If we have a metahost label, only allow its hosts.
568 group_hosts.intersection_update(hosts_in_label)
569 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000570 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000571 group_hosts, queue_entry)
572
573 # Job.synch_count is treated as "minimum synch count" when
574 # scheduling for an atomic group of hosts. The atomic group
575 # number of machines is the maximum to pick out of a single
576 # atomic group label for scheduling at one time.
577 min_hosts = job.synch_count
578 max_hosts = atomic_group.max_number_of_machines
579
showard54c1ea92009-05-20 00:32:58 +0000580 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000581 # Not enough eligible hosts in this atomic group label.
582 continue
583
showard54c1ea92009-05-20 00:32:58 +0000584 eligible_hosts_in_group = [self._hosts_available[id]
585 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000586 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000587 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000588
showard89f84db2009-03-12 20:39:13 +0000589 # Limit ourselves to scheduling the atomic group size.
590 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000591 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000592
593 # Remove the selected hosts from our cached internal state
594 # of available hosts in order to return the Host objects.
595 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000596 for host in eligible_hosts_in_group:
597 hosts_in_label.discard(host.id)
598 self._hosts_available.pop(host.id)
599 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000600 return host_list
601
602 return []
603
604
showard170873e2009-01-07 00:22:26 +0000605class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000606 def __init__(self):
607 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000608 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000609 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000610 user_cleanup_time = scheduler_config.config.clean_interval
611 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
612 _db, user_cleanup_time)
613 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000614 self._host_agents = {}
615 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000616
mbligh36768f02008-02-22 18:28:33 +0000617
showard915958d2009-04-22 21:00:58 +0000618 def initialize(self, recover_hosts=True):
619 self._periodic_cleanup.initialize()
620 self._24hr_upkeep.initialize()
621
jadmanski0afbb632008-06-06 21:10:57 +0000622 # always recover processes
623 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000624
jadmanski0afbb632008-06-06 21:10:57 +0000625 if recover_hosts:
626 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000627
628
jadmanski0afbb632008-06-06 21:10:57 +0000629 def tick(self):
showard170873e2009-01-07 00:22:26 +0000630 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000631 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000632 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000633 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000634 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000635 self._schedule_new_jobs()
636 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000637 _drone_manager.execute_actions()
638 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000639
showard97aed502008-11-04 02:01:24 +0000640
mblighf3294cc2009-04-08 21:17:38 +0000641 def _run_cleanup(self):
642 self._periodic_cleanup.run_cleanup_maybe()
643 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000644
mbligh36768f02008-02-22 18:28:33 +0000645
showard170873e2009-01-07 00:22:26 +0000646 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
647 for object_id in object_ids:
648 agent_dict.setdefault(object_id, set()).add(agent)
649
650
651 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
652 for object_id in object_ids:
653 assert object_id in agent_dict
654 agent_dict[object_id].remove(agent)
655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def add_agent(self, agent):
658 self._agents.append(agent)
659 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000660 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
661 self._register_agent_for_ids(self._queue_entry_agents,
662 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000663
showard170873e2009-01-07 00:22:26 +0000664
665 def get_agents_for_entry(self, queue_entry):
666 """
667 Find agents corresponding to the specified queue_entry.
668 """
showardd3dc1992009-04-22 21:01:40 +0000669 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000670
671
672 def host_has_agent(self, host):
673 """
674 Determine if there is currently an Agent present using this host.
675 """
676 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000677
678
jadmanski0afbb632008-06-06 21:10:57 +0000679 def remove_agent(self, agent):
680 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000681 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
682 agent)
683 self._unregister_agent_for_ids(self._queue_entry_agents,
684 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000685
686
jadmanski0afbb632008-06-06 21:10:57 +0000687 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000688 self._register_pidfiles()
689 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000690 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000691 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000692 self._reverify_remaining_hosts()
693 # reinitialize drones after killing orphaned processes, since they can
694 # leave around files when they die
695 _drone_manager.execute_actions()
696 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000697
showard170873e2009-01-07 00:22:26 +0000698
699 def _register_pidfiles(self):
700 # during recovery we may need to read pidfiles for both running and
701 # parsing entries
702 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000703 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000704 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000705 for pidfile_name in _ALL_PIDFILE_NAMES:
706 pidfile_id = _drone_manager.get_pidfile_id_from(
707 queue_entry.execution_tag(), pidfile_name=pidfile_name)
708 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000709
710
showardd3dc1992009-04-22 21:01:40 +0000711 def _recover_entries_with_status(self, status, orphans, pidfile_name,
712 recover_entries_fn):
713 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000714 for queue_entry in queue_entries:
715 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000716 # synchronous job we've already recovered
717 continue
showardd3dc1992009-04-22 21:01:40 +0000718 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000719 execution_tag = queue_entry.execution_tag()
720 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000721 run_monitor.attach_to_existing_process(execution_tag,
722 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000723
724 log_message = ('Recovering %s entry %s ' %
725 (status.lower(),
726 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000727 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000728 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000729 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000730 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000731 continue
mbligh90a549d2008-03-25 23:52:34 +0000732
showard597bfd32009-05-08 18:22:50 +0000733 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000734 run_monitor.get_process())
735 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
736 orphans.discard(run_monitor.get_process())
737
738
739 def _kill_remaining_orphan_processes(self, orphans):
740 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000741 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000742 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000743
showard170873e2009-01-07 00:22:26 +0000744
showardd3dc1992009-04-22 21:01:40 +0000745 def _recover_running_entries(self, orphans):
746 def recover_entries(job, queue_entries, run_monitor):
747 if run_monitor is not None:
748 queue_task = RecoveryQueueTask(job=job,
749 queue_entries=queue_entries,
750 run_monitor=run_monitor)
751 self.add_agent(Agent(tasks=[queue_task],
752 num_processes=len(queue_entries)))
753 # else, _requeue_other_active_entries will cover this
754
755 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
756 orphans, '.autoserv_execute',
757 recover_entries)
758
759
760 def _recover_gathering_entries(self, orphans):
761 def recover_entries(job, queue_entries, run_monitor):
762 gather_task = GatherLogsTask(job, queue_entries,
763 run_monitor=run_monitor)
764 self.add_agent(Agent([gather_task]))
765
766 self._recover_entries_with_status(
767 models.HostQueueEntry.Status.GATHERING,
768 orphans, _CRASHINFO_PID_FILE, recover_entries)
769
770
771 def _recover_parsing_entries(self, orphans):
772 def recover_entries(job, queue_entries, run_monitor):
773 reparse_task = FinalReparseTask(queue_entries,
774 run_monitor=run_monitor)
775 self.add_agent(Agent([reparse_task], num_processes=0))
776
777 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
778 orphans, _PARSER_PID_FILE,
779 recover_entries)
780
781
782 def _recover_all_recoverable_entries(self):
783 orphans = _drone_manager.get_orphaned_autoserv_processes()
784 self._recover_running_entries(orphans)
785 self._recover_gathering_entries(orphans)
786 self._recover_parsing_entries(orphans)
787 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000788
showard97aed502008-11-04 02:01:24 +0000789
showard170873e2009-01-07 00:22:26 +0000790 def _requeue_other_active_entries(self):
791 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000792 where='active AND NOT complete AND '
793 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000794 for queue_entry in queue_entries:
795 if self.get_agents_for_entry(queue_entry):
796 # entry has already been recovered
797 continue
showardd3dc1992009-04-22 21:01:40 +0000798 if queue_entry.aborted:
799 queue_entry.abort(self)
800 continue
801
802 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000803 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000804 if queue_entry.host:
805 tasks = queue_entry.host.reverify_tasks()
806 self.add_agent(Agent(tasks))
807 agent = queue_entry.requeue()
808
809
showard1ff7b2e2009-05-15 23:17:18 +0000810 def _find_reverify(self):
811 self._reverify_hosts_where("status = 'Reverify'")
812
813
showard170873e2009-01-07 00:22:26 +0000814 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000815 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000816 self._reverify_hosts_where("""(status = 'Repairing' OR
817 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000818 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000819
showard170873e2009-01-07 00:22:26 +0000820 # recover "Running" hosts with no active queue entries, although this
821 # should never happen
822 message = ('Recovering running host %s - this probably indicates a '
823 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000824 self._reverify_hosts_where("""status = 'Running' AND
825 id NOT IN (SELECT host_id
826 FROM host_queue_entries
827 WHERE active)""",
828 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000829
830
jadmanski0afbb632008-06-06 21:10:57 +0000831 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000832 print_message='Reverifying host %s'):
833 full_where='locked = 0 AND invalid = 0 AND ' + where
834 for host in Host.fetch(where=full_where):
835 if self.host_has_agent(host):
836 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000837 continue
showard170873e2009-01-07 00:22:26 +0000838 if print_message:
showardb18134f2009-03-20 20:52:18 +0000839 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000840 tasks = host.reverify_tasks()
841 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000842
843
jadmanski0afbb632008-06-06 21:10:57 +0000844 def _recover_hosts(self):
845 # recover "Repair Failed" hosts
846 message = 'Reverifying dead host %s'
847 self._reverify_hosts_where("status = 'Repair Failed'",
848 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000849
850
showard04c82c52008-05-29 19:38:12 +0000851
showardb95b1bd2008-08-15 18:11:04 +0000852 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000853 # prioritize by job priority, then non-metahost over metahost, then FIFO
854 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000855 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000856 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000857 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000858
859
showard89f84db2009-03-12 20:39:13 +0000860 def _refresh_pending_queue_entries(self):
861 """
862 Lookup the pending HostQueueEntries and call our HostScheduler
863 refresh() method given that list. Return the list.
864
865 @returns A list of pending HostQueueEntries sorted in priority order.
866 """
showard63a34772008-08-18 19:32:50 +0000867 queue_entries = self._get_pending_queue_entries()
868 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000869 return []
showardb95b1bd2008-08-15 18:11:04 +0000870
showard63a34772008-08-18 19:32:50 +0000871 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000872
showard89f84db2009-03-12 20:39:13 +0000873 return queue_entries
874
875
876 def _schedule_atomic_group(self, queue_entry):
877 """
878 Schedule the given queue_entry on an atomic group of hosts.
879
880 Returns immediately if there are insufficient available hosts.
881
882 Creates new HostQueueEntries based off of queue_entry for the
883 scheduled hosts and starts them all running.
884 """
885 # This is a virtual host queue entry representing an entire
886 # atomic group, find a group and schedule their hosts.
887 group_hosts = self._host_scheduler.find_eligible_atomic_group(
888 queue_entry)
889 if not group_hosts:
890 return
891 # The first assigned host uses the original HostQueueEntry
892 group_queue_entries = [queue_entry]
893 for assigned_host in group_hosts[1:]:
894 # Create a new HQE for every additional assigned_host.
895 new_hqe = HostQueueEntry.clone(queue_entry)
896 new_hqe.save()
897 group_queue_entries.append(new_hqe)
898 assert len(group_queue_entries) == len(group_hosts)
899 for queue_entry, host in itertools.izip(group_queue_entries,
900 group_hosts):
901 self._run_queue_entry(queue_entry, host)
902
903
904 def _schedule_new_jobs(self):
905 queue_entries = self._refresh_pending_queue_entries()
906 if not queue_entries:
907 return
908
showard63a34772008-08-18 19:32:50 +0000909 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000910 if (queue_entry.atomic_group_id is None or
911 queue_entry.host_id is not None):
912 assigned_host = self._host_scheduler.find_eligible_host(
913 queue_entry)
914 if assigned_host:
915 self._run_queue_entry(queue_entry, assigned_host)
916 else:
917 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000918
919
920 def _run_queue_entry(self, queue_entry, host):
921 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000922 # in some cases (synchronous jobs with run_verify=False), agent may be
923 # None
showard9976ce92008-10-15 20:28:13 +0000924 if agent:
925 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000926
927
jadmanski0afbb632008-06-06 21:10:57 +0000928 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000929 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
930 for agent in self.get_agents_for_entry(entry):
931 agent.abort()
932 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000933
934
showard324bf812009-01-20 23:23:38 +0000935 def _can_start_agent(self, agent, num_started_this_cycle,
936 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000937 # always allow zero-process agents to run
938 if agent.num_processes == 0:
939 return True
940 # don't allow any nonzero-process agents to run after we've reached a
941 # limit (this avoids starvation of many-process agents)
942 if have_reached_limit:
943 return False
944 # total process throttling
showard324bf812009-01-20 23:23:38 +0000945 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000946 return False
947 # if a single agent exceeds the per-cycle throttling, still allow it to
948 # run when it's the first agent in the cycle
949 if num_started_this_cycle == 0:
950 return True
951 # per-cycle throttling
952 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000953 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000954 return False
955 return True
956
957
jadmanski0afbb632008-06-06 21:10:57 +0000958 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000959 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000960 have_reached_limit = False
961 # iterate over copy, so we can remove agents during iteration
962 for agent in list(self._agents):
963 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000964 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000965 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000966 continue
967 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000968 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000969 have_reached_limit):
970 have_reached_limit = True
971 continue
showard4c5374f2008-09-04 17:02:56 +0000972 num_started_this_cycle += agent.num_processes
973 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000974 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000975 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000976
977
showard29f7cd22009-04-29 21:16:24 +0000978 def _process_recurring_runs(self):
979 recurring_runs = models.RecurringRun.objects.filter(
980 start_date__lte=datetime.datetime.now())
981 for rrun in recurring_runs:
982 # Create job from template
983 job = rrun.job
984 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000985 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000986
987 host_objects = info['hosts']
988 one_time_hosts = info['one_time_hosts']
989 metahost_objects = info['meta_hosts']
990 dependencies = info['dependencies']
991 atomic_group = info['atomic_group']
992
993 for host in one_time_hosts or []:
994 this_host = models.Host.create_one_time_host(host.hostname)
995 host_objects.append(this_host)
996
997 try:
998 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000999 options=options,
showard29f7cd22009-04-29 21:16:24 +00001000 host_objects=host_objects,
1001 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001002 atomic_group=atomic_group)
1003
1004 except Exception, ex:
1005 logging.exception(ex)
1006 #TODO send email
1007
1008 if rrun.loop_count == 1:
1009 rrun.delete()
1010 else:
1011 if rrun.loop_count != 0: # if not infinite loop
1012 # calculate new start_date
1013 difference = datetime.timedelta(seconds=rrun.loop_period)
1014 rrun.start_date = rrun.start_date + difference
1015 rrun.loop_count -= 1
1016 rrun.save()
1017
1018
showard170873e2009-01-07 00:22:26 +00001019class PidfileRunMonitor(object):
1020 """
1021 Client must call either run() to start a new process or
1022 attach_to_existing_process().
1023 """
mbligh36768f02008-02-22 18:28:33 +00001024
showard170873e2009-01-07 00:22:26 +00001025 class _PidfileException(Exception):
1026 """
1027 Raised when there's some unexpected behavior with the pid file, but only
1028 used internally (never allowed to escape this class).
1029 """
mbligh36768f02008-02-22 18:28:33 +00001030
1031
showard170873e2009-01-07 00:22:26 +00001032 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001033 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001034 self._start_time = None
1035 self.pidfile_id = None
1036 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001037
1038
showard170873e2009-01-07 00:22:26 +00001039 def _add_nice_command(self, command, nice_level):
1040 if not nice_level:
1041 return command
1042 return ['nice', '-n', str(nice_level)] + command
1043
1044
1045 def _set_start_time(self):
1046 self._start_time = time.time()
1047
1048
1049 def run(self, command, working_directory, nice_level=None, log_file=None,
1050 pidfile_name=None, paired_with_pidfile=None):
1051 assert command is not None
1052 if nice_level is not None:
1053 command = ['nice', '-n', str(nice_level)] + command
1054 self._set_start_time()
1055 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001056 command, working_directory, pidfile_name=pidfile_name,
1057 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001058
1059
showardd3dc1992009-04-22 21:01:40 +00001060 def attach_to_existing_process(self, execution_tag,
1061 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001062 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001063 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1064 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001065 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def kill(self):
showard170873e2009-01-07 00:22:26 +00001069 if self.has_process():
1070 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001071
mbligh36768f02008-02-22 18:28:33 +00001072
showard170873e2009-01-07 00:22:26 +00001073 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001074 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001075 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001076
1077
showard170873e2009-01-07 00:22:26 +00001078 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001079 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001080 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001081 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001082
1083
showard170873e2009-01-07 00:22:26 +00001084 def _read_pidfile(self, use_second_read=False):
1085 assert self.pidfile_id is not None, (
1086 'You must call run() or attach_to_existing_process()')
1087 contents = _drone_manager.get_pidfile_contents(
1088 self.pidfile_id, use_second_read=use_second_read)
1089 if contents.is_invalid():
1090 self._state = drone_manager.PidfileContents()
1091 raise self._PidfileException(contents)
1092 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001093
1094
showard21baa452008-10-21 00:08:39 +00001095 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001096 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1097 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001098 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001099 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001100 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001101
1102
1103 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001104 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001105 return
mblighbb421852008-03-11 22:36:16 +00001106
showard21baa452008-10-21 00:08:39 +00001107 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001108
showard170873e2009-01-07 00:22:26 +00001109 if self._state.process is None:
1110 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001111 return
mbligh90a549d2008-03-25 23:52:34 +00001112
showard21baa452008-10-21 00:08:39 +00001113 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001114 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001115 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001116 return
mbligh90a549d2008-03-25 23:52:34 +00001117
showard170873e2009-01-07 00:22:26 +00001118 # pid but no running process - maybe process *just* exited
1119 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001120 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001121 # autoserv exited without writing an exit code
1122 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001123 self._handle_pidfile_error(
1124 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001125
showard21baa452008-10-21 00:08:39 +00001126
1127 def _get_pidfile_info(self):
1128 """\
1129 After completion, self._state will contain:
1130 pid=None, exit_status=None if autoserv has not yet run
1131 pid!=None, exit_status=None if autoserv is running
1132 pid!=None, exit_status!=None if autoserv has completed
1133 """
1134 try:
1135 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001136 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001137 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001141 """\
1142 Called when no pidfile is found or no pid is in the pidfile.
1143 """
showard170873e2009-01-07 00:22:26 +00001144 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001145 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001146 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1147 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001148 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001149 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001150
1151
showard35162b02009-03-03 02:17:30 +00001152 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001153 """\
1154 Called when autoserv has exited without writing an exit status,
1155 or we've timed out waiting for autoserv to write a pid to the
1156 pidfile. In either case, we just return failure and the caller
1157 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001158
showard170873e2009-01-07 00:22:26 +00001159 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001160 """
1161 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001162 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001163 self._state.exit_status = 1
1164 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001168 self._get_pidfile_info()
1169 return self._state.exit_status
1170
1171
1172 def num_tests_failed(self):
1173 self._get_pidfile_info()
1174 assert self._state.num_tests_failed is not None
1175 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001176
1177
mbligh36768f02008-02-22 18:28:33 +00001178class Agent(object):
showard170873e2009-01-07 00:22:26 +00001179 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001180 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001181 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001182 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001183 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001184
showard170873e2009-01-07 00:22:26 +00001185 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1186 for task in tasks)
1187 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1188
showardd3dc1992009-04-22 21:01:40 +00001189 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001190 for task in tasks:
1191 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001192
1193
showardd3dc1992009-04-22 21:01:40 +00001194 def _clear_queue(self):
1195 self.queue = Queue.Queue(0)
1196
1197
showard170873e2009-01-07 00:22:26 +00001198 def _union_ids(self, id_lists):
1199 return set(itertools.chain(*id_lists))
1200
1201
jadmanski0afbb632008-06-06 21:10:57 +00001202 def add_task(self, task):
1203 self.queue.put_nowait(task)
1204 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def tick(self):
showard21baa452008-10-21 00:08:39 +00001208 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001209 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001210 self.active_task.poll()
1211 if not self.active_task.is_done():
1212 return
1213 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001214
1215
jadmanski0afbb632008-06-06 21:10:57 +00001216 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001217 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001218 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001219 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001220 if not self.active_task.success:
1221 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001222 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001223
jadmanski0afbb632008-06-06 21:10:57 +00001224 if not self.is_done():
1225 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001229 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001230 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1231 # get reset.
1232 new_agent = Agent(self.active_task.failure_tasks)
1233 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001234
mblighe2586682008-02-29 22:45:46 +00001235
showard4c5374f2008-09-04 17:02:56 +00001236 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001237 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001241 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001242
1243
showardd3dc1992009-04-22 21:01:40 +00001244 def abort(self):
showard08a36412009-05-05 01:01:13 +00001245 # abort tasks until the queue is empty or a task ignores the abort
1246 while not self.is_done():
1247 if not self.active_task:
1248 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001249 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001250 if not self.active_task.aborted:
1251 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001252 return
1253 self.active_task = None
1254
showardd3dc1992009-04-22 21:01:40 +00001255
mbligh36768f02008-02-22 18:28:33 +00001256class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001257 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1258 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001259 self.done = False
1260 self.failure_tasks = failure_tasks
1261 self.started = False
1262 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001263 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001264 self.task = None
1265 self.agent = None
1266 self.monitor = None
1267 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001268 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001269 self.queue_entry_ids = []
1270 self.host_ids = []
1271 self.log_file = None
1272
1273
1274 def _set_ids(self, host=None, queue_entries=None):
1275 if queue_entries and queue_entries != [None]:
1276 self.host_ids = [entry.host.id for entry in queue_entries]
1277 self.queue_entry_ids = [entry.id for entry in queue_entries]
1278 else:
1279 assert host
1280 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def poll(self):
showard08a36412009-05-05 01:01:13 +00001284 if not self.started:
1285 self.start()
1286 self.tick()
1287
1288
1289 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001290 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001291 exit_code = self.monitor.exit_code()
1292 if exit_code is None:
1293 return
1294 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001295 else:
1296 success = False
mbligh36768f02008-02-22 18:28:33 +00001297
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001299
1300
jadmanski0afbb632008-06-06 21:10:57 +00001301 def is_done(self):
1302 return self.done
mbligh36768f02008-02-22 18:28:33 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001306 if self.done:
1307 return
jadmanski0afbb632008-06-06 21:10:57 +00001308 self.done = True
1309 self.success = success
1310 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001311
1312
jadmanski0afbb632008-06-06 21:10:57 +00001313 def prolog(self):
1314 pass
mblighd64e5702008-04-04 21:39:28 +00001315
1316
jadmanski0afbb632008-06-06 21:10:57 +00001317 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001318 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001319
mbligh36768f02008-02-22 18:28:33 +00001320
jadmanski0afbb632008-06-06 21:10:57 +00001321 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001322 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001323 _drone_manager.copy_to_results_repository(
1324 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def epilog(self):
1328 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001329
1330
jadmanski0afbb632008-06-06 21:10:57 +00001331 def start(self):
1332 assert self.agent
1333
1334 if not self.started:
1335 self.prolog()
1336 self.run()
1337
1338 self.started = True
1339
1340
1341 def abort(self):
1342 if self.monitor:
1343 self.monitor.kill()
1344 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001345 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001346 self.cleanup()
1347
1348
showard170873e2009-01-07 00:22:26 +00001349 def set_host_log_file(self, base_name, host):
1350 filename = '%s.%s' % (time.time(), base_name)
1351 self.log_file = os.path.join('hosts', host.hostname, filename)
1352
1353
showardde634ee2009-01-30 01:44:24 +00001354 def _get_consistent_execution_tag(self, queue_entries):
1355 first_execution_tag = queue_entries[0].execution_tag()
1356 for queue_entry in queue_entries[1:]:
1357 assert queue_entry.execution_tag() == first_execution_tag, (
1358 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1359 queue_entry,
1360 first_execution_tag,
1361 queue_entries[0]))
1362 return first_execution_tag
1363
1364
showarda1e74b32009-05-12 17:32:04 +00001365 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001366 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001367 if use_monitor is None:
1368 assert self.monitor
1369 use_monitor = self.monitor
1370 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001371 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001372 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001373 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001374 results_path)
showardde634ee2009-01-30 01:44:24 +00001375
showarda1e74b32009-05-12 17:32:04 +00001376
1377 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001378 reparse_task = FinalReparseTask(queue_entries)
1379 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1380
1381
showarda1e74b32009-05-12 17:32:04 +00001382 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1383 self._copy_results(queue_entries, use_monitor)
1384 self._parse_results(queue_entries)
1385
1386
showardd3dc1992009-04-22 21:01:40 +00001387 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001388 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001389 self.monitor = PidfileRunMonitor()
1390 self.monitor.run(self.cmd, self._working_directory,
1391 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001392 log_file=self.log_file,
1393 pidfile_name=pidfile_name,
1394 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001395
1396
showardd9205182009-04-27 20:09:55 +00001397class TaskWithJobKeyvals(object):
1398 """AgentTask mixin providing functionality to help with job keyval files."""
1399 _KEYVAL_FILE = 'keyval'
1400 def _format_keyval(self, key, value):
1401 return '%s=%s' % (key, value)
1402
1403
1404 def _keyval_path(self):
1405 """Subclasses must override this"""
1406 raise NotImplemented
1407
1408
1409 def _write_keyval_after_job(self, field, value):
1410 assert self.monitor
1411 if not self.monitor.has_process():
1412 return
1413 _drone_manager.write_lines_to_file(
1414 self._keyval_path(), [self._format_keyval(field, value)],
1415 paired_with_process=self.monitor.get_process())
1416
1417
1418 def _job_queued_keyval(self, job):
1419 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1420
1421
1422 def _write_job_finished(self):
1423 self._write_keyval_after_job("job_finished", int(time.time()))
1424
1425
1426class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001427 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001428 """\
showard170873e2009-01-07 00:22:26 +00001429 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001430 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001431 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001432 # normalize the protection name
1433 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001434
jadmanski0afbb632008-06-06 21:10:57 +00001435 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001436 self.queue_entry_to_fail = queue_entry
1437 # *don't* include the queue entry in IDs -- if the queue entry is
1438 # aborted, we want to leave the repair task running
1439 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001440
1441 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001442 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1443 ['-R', '--host-protection', protection],
1444 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001445 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1446
showard170873e2009-01-07 00:22:26 +00001447 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001448
mbligh36768f02008-02-22 18:28:33 +00001449
jadmanski0afbb632008-06-06 21:10:57 +00001450 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001451 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001452 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001453 if self.queue_entry_to_fail:
1454 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001455
1456
showardd9205182009-04-27 20:09:55 +00001457 def _keyval_path(self):
1458 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1459
1460
showardde634ee2009-01-30 01:44:24 +00001461 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001462 assert self.queue_entry_to_fail
1463
1464 if self.queue_entry_to_fail.meta_host:
1465 return # don't fail metahost entries, they'll be reassigned
1466
1467 self.queue_entry_to_fail.update_from_database()
1468 if self.queue_entry_to_fail.status != 'Queued':
1469 return # entry has been aborted
1470
1471 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001472 queued_key, queued_time = self._job_queued_keyval(
1473 self.queue_entry_to_fail.job)
1474 self._write_keyval_after_job(queued_key, queued_time)
1475 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001476 # copy results logs into the normal place for job results
1477 _drone_manager.copy_results_on_drone(
1478 self.monitor.get_process(),
1479 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001480 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001481
showarda1e74b32009-05-12 17:32:04 +00001482 self._copy_results([self.queue_entry_to_fail])
1483 if self.queue_entry_to_fail.job.parse_failed_repair:
1484 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001485 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001486
1487
jadmanski0afbb632008-06-06 21:10:57 +00001488 def epilog(self):
1489 super(RepairTask, self).epilog()
1490 if self.success:
1491 self.host.set_status('Ready')
1492 else:
1493 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001494 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001495 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001496
1497
showard8fe93b52008-11-18 17:53:22 +00001498class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001499 def epilog(self):
1500 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001501 should_copy_results = (self.queue_entry and not self.success
1502 and not self.queue_entry.meta_host)
1503 if should_copy_results:
1504 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001505 destination = os.path.join(self.queue_entry.execution_tag(),
1506 os.path.basename(self.log_file))
1507 _drone_manager.copy_to_results_repository(
1508 self.monitor.get_process(), self.log_file,
1509 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001510
1511
1512class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001513 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001514 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001515 self.host = host or queue_entry.host
1516 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001517
jadmanski0afbb632008-06-06 21:10:57 +00001518 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001519 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1520 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001521 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001522 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1523 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001524
showard170873e2009-01-07 00:22:26 +00001525 self.set_host_log_file('verify', self.host)
1526 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001530 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001531 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001532 if self.queue_entry:
1533 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001534 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def epilog(self):
1538 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001539
jadmanski0afbb632008-06-06 21:10:57 +00001540 if self.success:
1541 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001542
1543
showardd9205182009-04-27 20:09:55 +00001544class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001545 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001546 self.job = job
1547 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001548 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001549 super(QueueTask, self).__init__(cmd, self._execution_tag())
1550 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001551
1552
showard73ec0442009-02-07 02:05:20 +00001553 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001554 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001555
1556
1557 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1558 keyval_contents = '\n'.join(self._format_keyval(key, value)
1559 for key, value in keyval_dict.iteritems())
1560 # always end with a newline to allow additional keyvals to be written
1561 keyval_contents += '\n'
1562 _drone_manager.attach_file_to_execution(self._execution_tag(),
1563 keyval_contents,
1564 file_path=keyval_path)
1565
1566
1567 def _write_keyvals_before_job(self, keyval_dict):
1568 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1569
1570
showard170873e2009-01-07 00:22:26 +00001571 def _write_host_keyvals(self, host):
1572 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1573 host.hostname)
1574 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001575 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1576 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001577
1578
showard170873e2009-01-07 00:22:26 +00001579 def _execution_tag(self):
1580 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001581
1582
jadmanski0afbb632008-06-06 21:10:57 +00001583 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001584 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001585 keyval_dict = {queued_key: queued_time}
1586 if self.group_name:
1587 keyval_dict['host_group_name'] = self.group_name
1588 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001589 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001590 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001591 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001592 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001593 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001594 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001595 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001596 assert len(self.queue_entries) == 1
1597 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001598
1599
showard35162b02009-03-03 02:17:30 +00001600 def _write_lost_process_error_file(self):
1601 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1602 _drone_manager.write_lines_to_file(error_file_path,
1603 [_LOST_PROCESS_ERROR])
1604
1605
showardd3dc1992009-04-22 21:01:40 +00001606 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001607 if not self.monitor:
1608 return
1609
showardd9205182009-04-27 20:09:55 +00001610 self._write_job_finished()
1611
showardd3dc1992009-04-22 21:01:40 +00001612 # both of these conditionals can be true, iff the process ran, wrote a
1613 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001614 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001615 gather_task = GatherLogsTask(self.job, self.queue_entries)
1616 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001617
1618 if self.monitor.lost_process:
1619 self._write_lost_process_error_file()
1620 for queue_entry in self.queue_entries:
1621 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001622
1623
showardcbd74612008-11-19 21:42:02 +00001624 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001625 _drone_manager.write_lines_to_file(
1626 os.path.join(self._execution_tag(), 'status.log'),
1627 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001628 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001629
1630
jadmanskif7fa2cc2008-10-01 14:13:23 +00001631 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001632 if not self.monitor or not self.monitor.has_process():
1633 return
1634
jadmanskif7fa2cc2008-10-01 14:13:23 +00001635 # build up sets of all the aborted_by and aborted_on values
1636 aborted_by, aborted_on = set(), set()
1637 for queue_entry in self.queue_entries:
1638 if queue_entry.aborted_by:
1639 aborted_by.add(queue_entry.aborted_by)
1640 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1641 aborted_on.add(t)
1642
1643 # extract some actual, unique aborted by value and write it out
1644 assert len(aborted_by) <= 1
1645 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001646 aborted_by_value = aborted_by.pop()
1647 aborted_on_value = max(aborted_on)
1648 else:
1649 aborted_by_value = 'autotest_system'
1650 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001651
showarda0382352009-02-11 23:36:43 +00001652 self._write_keyval_after_job("aborted_by", aborted_by_value)
1653 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001654
showardcbd74612008-11-19 21:42:02 +00001655 aborted_on_string = str(datetime.datetime.fromtimestamp(
1656 aborted_on_value))
1657 self._write_status_comment('Job aborted by %s on %s' %
1658 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001659
1660
jadmanski0afbb632008-06-06 21:10:57 +00001661 def abort(self):
1662 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001663 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001664 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001665
1666
jadmanski0afbb632008-06-06 21:10:57 +00001667 def epilog(self):
1668 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001669 self._finish_task()
1670 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001671
1672
mblighbb421852008-03-11 22:36:16 +00001673class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001674 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001675 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001676 self.monitor = run_monitor
1677 self.started = True
1678 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001679
1680
jadmanski0afbb632008-06-06 21:10:57 +00001681 def run(self):
showard5add1c82009-05-26 19:27:46 +00001682 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001683
1684
jadmanski0afbb632008-06-06 21:10:57 +00001685 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001686 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001687
1688
showardd3dc1992009-04-22 21:01:40 +00001689class PostJobTask(AgentTask):
1690 def __init__(self, queue_entries, pidfile_name, logfile_name,
1691 run_monitor=None):
1692 """
1693 If run_monitor != None, we're recovering a running task.
1694 """
1695 self._queue_entries = queue_entries
1696 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001697
1698 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1699 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1700 self._autoserv_monitor = PidfileRunMonitor()
1701 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1702 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1703
1704 if _testing_mode:
1705 command = 'true'
1706 else:
1707 command = self._generate_command(self._results_dir)
1708
1709 super(PostJobTask, self).__init__(cmd=command,
1710 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001711 # this must happen *after* the super call
1712 self.monitor = run_monitor
1713 if run_monitor:
1714 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001715
1716 self.log_file = os.path.join(self._execution_tag, logfile_name)
1717 self._final_status = self._determine_final_status()
1718
1719
1720 def _generate_command(self, results_dir):
1721 raise NotImplementedError('Subclasses must override this')
1722
1723
1724 def _job_was_aborted(self):
1725 was_aborted = None
1726 for queue_entry in self._queue_entries:
1727 queue_entry.update_from_database()
1728 if was_aborted is None: # first queue entry
1729 was_aborted = bool(queue_entry.aborted)
1730 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1731 email_manager.manager.enqueue_notify_email(
1732 'Inconsistent abort state',
1733 'Queue entries have inconsistent abort state: ' +
1734 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1735 # don't crash here, just assume true
1736 return True
1737 return was_aborted
1738
1739
1740 def _determine_final_status(self):
1741 if self._job_was_aborted():
1742 return models.HostQueueEntry.Status.ABORTED
1743
1744 # we'll use a PidfileRunMonitor to read the autoserv exit status
1745 if self._autoserv_monitor.exit_code() == 0:
1746 return models.HostQueueEntry.Status.COMPLETED
1747 return models.HostQueueEntry.Status.FAILED
1748
1749
1750 def run(self):
showard5add1c82009-05-26 19:27:46 +00001751 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001752
showard5add1c82009-05-26 19:27:46 +00001753 # make sure we actually have results to work with.
1754 # this should never happen in normal operation.
1755 if not self._autoserv_monitor.has_process():
1756 email_manager.manager.enqueue_notify_email(
1757 'No results in post-job task',
1758 'No results in post-job task at %s' %
1759 self._autoserv_monitor.pidfile_id)
1760 self.finished(False)
1761 return
1762
1763 super(PostJobTask, self).run(
1764 pidfile_name=self._pidfile_name,
1765 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001766
1767
1768 def _set_all_statuses(self, status):
1769 for queue_entry in self._queue_entries:
1770 queue_entry.set_status(status)
1771
1772
1773 def abort(self):
1774 # override AgentTask.abort() to avoid killing the process and ending
1775 # the task. post-job tasks continue when the job is aborted.
1776 pass
1777
1778
1779class GatherLogsTask(PostJobTask):
1780 """
1781 Task responsible for
1782 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1783 * copying logs to the results repository
1784 * spawning CleanupTasks for hosts, if necessary
1785 * spawning a FinalReparseTask for the job
1786 """
1787 def __init__(self, job, queue_entries, run_monitor=None):
1788 self._job = job
1789 super(GatherLogsTask, self).__init__(
1790 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1791 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1792 self._set_ids(queue_entries=queue_entries)
1793
1794
1795 def _generate_command(self, results_dir):
1796 host_list = ','.join(queue_entry.host.hostname
1797 for queue_entry in self._queue_entries)
1798 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1799 '-r', results_dir]
1800
1801
1802 def prolog(self):
1803 super(GatherLogsTask, self).prolog()
1804 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1805
1806
1807 def _reboot_hosts(self):
1808 reboot_after = self._job.reboot_after
1809 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001810 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1811 do_reboot = True
1812 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001813 do_reboot = True
1814 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1815 final_success = (
1816 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1817 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1818 do_reboot = (final_success and num_tests_failed == 0)
1819
1820 for queue_entry in self._queue_entries:
1821 if do_reboot:
1822 # don't pass the queue entry to the CleanupTask. if the cleanup
1823 # fails, the job doesn't care -- it's over.
1824 cleanup_task = CleanupTask(host=queue_entry.host)
1825 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1826 else:
1827 queue_entry.host.set_status('Ready')
1828
1829
1830 def epilog(self):
1831 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001832 if self._autoserv_monitor.has_process():
1833 self._copy_and_parse_results(self._queue_entries,
1834 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001835 self._reboot_hosts()
1836
1837
showard0bbfc212009-04-29 21:06:13 +00001838 def run(self):
showard597bfd32009-05-08 18:22:50 +00001839 autoserv_exit_code = self._autoserv_monitor.exit_code()
1840 # only run if Autoserv exited due to some signal. if we have no exit
1841 # code, assume something bad (and signal-like) happened.
1842 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001843 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001844 else:
1845 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001846
1847
showard8fe93b52008-11-18 17:53:22 +00001848class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001849 def __init__(self, host=None, queue_entry=None):
1850 assert bool(host) ^ bool(queue_entry)
1851 if queue_entry:
1852 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001853 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001854 self.host = host
showard170873e2009-01-07 00:22:26 +00001855
1856 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001857 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1858 ['--cleanup'],
1859 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001860 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001861 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1862 failure_tasks=[repair_task])
1863
1864 self._set_ids(host=host, queue_entries=[queue_entry])
1865 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001866
mblighd5c95802008-03-05 00:33:46 +00001867
jadmanski0afbb632008-06-06 21:10:57 +00001868 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001869 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001870 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001871 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001872
mblighd5c95802008-03-05 00:33:46 +00001873
showard21baa452008-10-21 00:08:39 +00001874 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001875 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001876 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001877 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001878 self.host.update_field('dirty', 0)
1879
1880
showardd3dc1992009-04-22 21:01:40 +00001881class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001882 _num_running_parses = 0
1883
showardd3dc1992009-04-22 21:01:40 +00001884 def __init__(self, queue_entries, run_monitor=None):
1885 super(FinalReparseTask, self).__init__(queue_entries,
1886 pidfile_name=_PARSER_PID_FILE,
1887 logfile_name='.parse.log',
1888 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001889 # don't use _set_ids, since we don't want to set the host_ids
1890 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001891 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001892
showard97aed502008-11-04 02:01:24 +00001893
1894 @classmethod
1895 def _increment_running_parses(cls):
1896 cls._num_running_parses += 1
1897
1898
1899 @classmethod
1900 def _decrement_running_parses(cls):
1901 cls._num_running_parses -= 1
1902
1903
1904 @classmethod
1905 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001906 return (cls._num_running_parses <
1907 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001908
1909
1910 def prolog(self):
1911 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001912 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001913
1914
1915 def epilog(self):
1916 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001917 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001918
1919
showardd3dc1992009-04-22 21:01:40 +00001920 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001921 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001922 results_dir]
showard97aed502008-11-04 02:01:24 +00001923
1924
showard08a36412009-05-05 01:01:13 +00001925 def tick(self):
1926 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001927 # and we can, at which point we revert to default behavior
1928 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001929 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001930 else:
1931 self._try_starting_parse()
1932
1933
1934 def run(self):
1935 # override run() to not actually run unless we can
1936 self._try_starting_parse()
1937
1938
1939 def _try_starting_parse(self):
1940 if not self._can_run_new_parse():
1941 return
showard170873e2009-01-07 00:22:26 +00001942
showard97aed502008-11-04 02:01:24 +00001943 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001944 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001945
showard97aed502008-11-04 02:01:24 +00001946 self._increment_running_parses()
1947 self._parse_started = True
1948
1949
1950 def finished(self, success):
1951 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001952 if self._parse_started:
1953 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001954
1955
showardc9ae1782009-01-30 01:42:37 +00001956class SetEntryPendingTask(AgentTask):
1957 def __init__(self, queue_entry):
1958 super(SetEntryPendingTask, self).__init__(cmd='')
1959 self._queue_entry = queue_entry
1960 self._set_ids(queue_entries=[queue_entry])
1961
1962
1963 def run(self):
1964 agent = self._queue_entry.on_pending()
1965 if agent:
1966 self.agent.dispatcher.add_agent(agent)
1967 self.finished(True)
1968
1969
showarda3c58572009-03-12 20:36:59 +00001970class DBError(Exception):
1971 """Raised by the DBObject constructor when its select fails."""
1972
1973
mbligh36768f02008-02-22 18:28:33 +00001974class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001975 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001976
1977 # Subclasses MUST override these:
1978 _table_name = ''
1979 _fields = ()
1980
showarda3c58572009-03-12 20:36:59 +00001981 # A mapping from (type, id) to the instance of the object for that
1982 # particular id. This prevents us from creating new Job() and Host()
1983 # instances for every HostQueueEntry object that we instantiate as
1984 # multiple HQEs often share the same Job.
1985 _instances_by_type_and_id = weakref.WeakValueDictionary()
1986 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001987
showarda3c58572009-03-12 20:36:59 +00001988
1989 def __new__(cls, id=None, **kwargs):
1990 """
1991 Look to see if we already have an instance for this particular type
1992 and id. If so, use it instead of creating a duplicate instance.
1993 """
1994 if id is not None:
1995 instance = cls._instances_by_type_and_id.get((cls, id))
1996 if instance:
1997 return instance
1998 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1999
2000
2001 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002002 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002003 assert self._table_name, '_table_name must be defined in your class'
2004 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002005 if not new_record:
2006 if self._initialized and not always_query:
2007 return # We've already been initialized.
2008 if id is None:
2009 id = row[0]
2010 # Tell future constructors to use us instead of re-querying while
2011 # this instance is still around.
2012 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002013
showard6ae5ea92009-02-25 00:11:51 +00002014 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002015
jadmanski0afbb632008-06-06 21:10:57 +00002016 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002017
jadmanski0afbb632008-06-06 21:10:57 +00002018 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002019 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002020
showarda3c58572009-03-12 20:36:59 +00002021 if self._initialized:
2022 differences = self._compare_fields_in_row(row)
2023 if differences:
showard7629f142009-03-27 21:02:02 +00002024 logging.warn(
2025 'initialized %s %s instance requery is updating: %s',
2026 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002027 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002028 self._initialized = True
2029
2030
2031 @classmethod
2032 def _clear_instance_cache(cls):
2033 """Used for testing, clear the internal instance cache."""
2034 cls._instances_by_type_and_id.clear()
2035
2036
showardccbd6c52009-03-21 00:10:21 +00002037 def _fetch_row_from_db(self, row_id):
2038 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2039 rows = _db.execute(sql, (row_id,))
2040 if not rows:
showard76e29d12009-04-15 21:53:10 +00002041 raise DBError("row not found (table=%s, row id=%s)"
2042 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002043 return rows[0]
2044
2045
showarda3c58572009-03-12 20:36:59 +00002046 def _assert_row_length(self, row):
2047 assert len(row) == len(self._fields), (
2048 "table = %s, row = %s/%d, fields = %s/%d" % (
2049 self.__table, row, len(row), self._fields, len(self._fields)))
2050
2051
2052 def _compare_fields_in_row(self, row):
2053 """
2054 Given a row as returned by a SELECT query, compare it to our existing
2055 in memory fields.
2056
2057 @param row - A sequence of values corresponding to fields named in
2058 The class attribute _fields.
2059
2060 @returns A dictionary listing the differences keyed by field name
2061 containing tuples of (current_value, row_value).
2062 """
2063 self._assert_row_length(row)
2064 differences = {}
2065 for field, row_value in itertools.izip(self._fields, row):
2066 current_value = getattr(self, field)
2067 if current_value != row_value:
2068 differences[field] = (current_value, row_value)
2069 return differences
showard2bab8f42008-11-12 18:15:22 +00002070
2071
2072 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002073 """
2074 Update our field attributes using a single row returned by SELECT.
2075
2076 @param row - A sequence of values corresponding to fields named in
2077 the class fields list.
2078 """
2079 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002080
showard2bab8f42008-11-12 18:15:22 +00002081 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002082 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002083 setattr(self, field, value)
2084 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002085
showard2bab8f42008-11-12 18:15:22 +00002086 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002087
mblighe2586682008-02-29 22:45:46 +00002088
showardccbd6c52009-03-21 00:10:21 +00002089 def update_from_database(self):
2090 assert self.id is not None
2091 row = self._fetch_row_from_db(self.id)
2092 self._update_fields_from_row(row)
2093
2094
jadmanski0afbb632008-06-06 21:10:57 +00002095 def count(self, where, table = None):
2096 if not table:
2097 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002098
jadmanski0afbb632008-06-06 21:10:57 +00002099 rows = _db.execute("""
2100 SELECT count(*) FROM %s
2101 WHERE %s
2102 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002103
jadmanski0afbb632008-06-06 21:10:57 +00002104 assert len(rows) == 1
2105
2106 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002107
2108
showardd3dc1992009-04-22 21:01:40 +00002109 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002110 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002111
showard2bab8f42008-11-12 18:15:22 +00002112 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002113 return
mbligh36768f02008-02-22 18:28:33 +00002114
mblighf8c624d2008-07-03 16:58:45 +00002115 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002116 _db.execute(query, (value, self.id))
2117
showard2bab8f42008-11-12 18:15:22 +00002118 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002119
2120
jadmanski0afbb632008-06-06 21:10:57 +00002121 def save(self):
2122 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002123 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002124 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002125 values = []
2126 for key in keys:
2127 value = getattr(self, key)
2128 if value is None:
2129 values.append('NULL')
2130 else:
2131 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002132 values_str = ','.join(values)
2133 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2134 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002135 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002136 # Update our id to the one the database just assigned to us.
2137 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002138
2139
jadmanski0afbb632008-06-06 21:10:57 +00002140 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002141 self._instances_by_type_and_id.pop((type(self), id), None)
2142 self._initialized = False
2143 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002144 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2145 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002146
2147
showard63a34772008-08-18 19:32:50 +00002148 @staticmethod
2149 def _prefix_with(string, prefix):
2150 if string:
2151 string = prefix + string
2152 return string
2153
2154
jadmanski0afbb632008-06-06 21:10:57 +00002155 @classmethod
showard989f25d2008-10-01 11:38:11 +00002156 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002157 """
2158 Construct instances of our class based on the given database query.
2159
2160 @yields One class instance for each row fetched.
2161 """
showard63a34772008-08-18 19:32:50 +00002162 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2163 where = cls._prefix_with(where, 'WHERE ')
2164 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002165 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002166 'joins' : joins,
2167 'where' : where,
2168 'order_by' : order_by})
2169 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002170 for row in rows:
2171 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002172
mbligh36768f02008-02-22 18:28:33 +00002173
2174class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002175 _table_name = 'ineligible_host_queues'
2176 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002177
2178
showard89f84db2009-03-12 20:39:13 +00002179class AtomicGroup(DBObject):
2180 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002181 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2182 'invalid')
showard89f84db2009-03-12 20:39:13 +00002183
2184
showard989f25d2008-10-01 11:38:11 +00002185class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002186 _table_name = 'labels'
2187 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002188 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002189
2190
mbligh36768f02008-02-22 18:28:33 +00002191class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002192 _table_name = 'hosts'
2193 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2194 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2195
2196
jadmanski0afbb632008-06-06 21:10:57 +00002197 def current_task(self):
2198 rows = _db.execute("""
2199 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2200 """, (self.id,))
2201
2202 if len(rows) == 0:
2203 return None
2204 else:
2205 assert len(rows) == 1
2206 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002207 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002208
2209
jadmanski0afbb632008-06-06 21:10:57 +00002210 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002211 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002212 if self.current_task():
2213 self.current_task().requeue()
2214
showard6ae5ea92009-02-25 00:11:51 +00002215
jadmanski0afbb632008-06-06 21:10:57 +00002216 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002217 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002218 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002219
2220
showard170873e2009-01-07 00:22:26 +00002221 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002222 """
showard170873e2009-01-07 00:22:26 +00002223 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002224 """
2225 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002226 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002227 FROM labels
2228 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002229 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002230 ORDER BY labels.name
2231 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002232 platform = None
2233 all_labels = []
2234 for label_name, is_platform in rows:
2235 if is_platform:
2236 platform = label_name
2237 all_labels.append(label_name)
2238 return platform, all_labels
2239
2240
2241 def reverify_tasks(self):
2242 cleanup_task = CleanupTask(host=self)
2243 verify_task = VerifyTask(host=self)
2244 # just to make sure this host does not get taken away
2245 self.set_status('Cleaning')
2246 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002247
2248
showard54c1ea92009-05-20 00:32:58 +00002249 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2250
2251
2252 @classmethod
2253 def cmp_for_sort(cls, a, b):
2254 """
2255 A comparison function for sorting Host objects by hostname.
2256
2257 This strips any trailing numeric digits, ignores leading 0s and
2258 compares hostnames by the leading name and the trailing digits as a
2259 number. If both hostnames do not match this pattern, they are simply
2260 compared as lower case strings.
2261
2262 Example of how hostnames will be sorted:
2263
2264 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2265
2266 This hopefully satisfy most people's hostname sorting needs regardless
2267 of their exact naming schemes. Nobody sane should have both a host10
2268 and host010 (but the algorithm works regardless).
2269 """
2270 lower_a = a.hostname.lower()
2271 lower_b = b.hostname.lower()
2272 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2273 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2274 if match_a and match_b:
2275 name_a, number_a_str = match_a.groups()
2276 name_b, number_b_str = match_b.groups()
2277 number_a = int(number_a_str.lstrip('0'))
2278 number_b = int(number_b_str.lstrip('0'))
2279 result = cmp((name_a, number_a), (name_b, number_b))
2280 if result == 0 and lower_a != lower_b:
2281 # If they compared equal above but the lower case names are
2282 # indeed different, don't report equality. abc012 != abc12.
2283 return cmp(lower_a, lower_b)
2284 return result
2285 else:
2286 return cmp(lower_a, lower_b)
2287
2288
mbligh36768f02008-02-22 18:28:33 +00002289class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002290 _table_name = 'host_queue_entries'
2291 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002292 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002293 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002294
2295
showarda3c58572009-03-12 20:36:59 +00002296 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002297 assert id or row
showarda3c58572009-03-12 20:36:59 +00002298 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002299 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002300
jadmanski0afbb632008-06-06 21:10:57 +00002301 if self.host_id:
2302 self.host = Host(self.host_id)
2303 else:
2304 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002305
showard170873e2009-01-07 00:22:26 +00002306 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002307 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002308
2309
showard89f84db2009-03-12 20:39:13 +00002310 @classmethod
2311 def clone(cls, template):
2312 """
2313 Creates a new row using the values from a template instance.
2314
2315 The new instance will not exist in the database or have a valid
2316 id attribute until its save() method is called.
2317 """
2318 assert isinstance(template, cls)
2319 new_row = [getattr(template, field) for field in cls._fields]
2320 clone = cls(row=new_row, new_record=True)
2321 clone.id = None
2322 return clone
2323
2324
showardc85c21b2008-11-24 22:17:37 +00002325 def _view_job_url(self):
2326 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2327
2328
showardf1ae3542009-05-11 19:26:02 +00002329 def get_labels(self):
2330 """
2331 Get all labels associated with this host queue entry (either via the
2332 meta_host or as a job dependency label). The labels yielded are not
2333 guaranteed to be unique.
2334
2335 @yields Label instances associated with this host_queue_entry.
2336 """
2337 if self.meta_host:
2338 yield Label(id=self.meta_host, always_query=False)
2339 labels = Label.fetch(
2340 joins="JOIN jobs_dependency_labels AS deps "
2341 "ON (labels.id = deps.label_id)",
2342 where="deps.job_id = %d" % self.job.id)
2343 for label in labels:
2344 yield label
2345
2346
jadmanski0afbb632008-06-06 21:10:57 +00002347 def set_host(self, host):
2348 if host:
2349 self.queue_log_record('Assigning host ' + host.hostname)
2350 self.update_field('host_id', host.id)
2351 self.update_field('active', True)
2352 self.block_host(host.id)
2353 else:
2354 self.queue_log_record('Releasing host')
2355 self.unblock_host(self.host.id)
2356 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002357
jadmanski0afbb632008-06-06 21:10:57 +00002358 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002359
2360
jadmanski0afbb632008-06-06 21:10:57 +00002361 def get_host(self):
2362 return self.host
mbligh36768f02008-02-22 18:28:33 +00002363
2364
jadmanski0afbb632008-06-06 21:10:57 +00002365 def queue_log_record(self, log_line):
2366 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002367 _drone_manager.write_lines_to_file(self.queue_log_path,
2368 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002369
2370
jadmanski0afbb632008-06-06 21:10:57 +00002371 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002372 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002373 row = [0, self.job.id, host_id]
2374 block = IneligibleHostQueue(row=row, new_record=True)
2375 block.save()
mblighe2586682008-02-29 22:45:46 +00002376
2377
jadmanski0afbb632008-06-06 21:10:57 +00002378 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002379 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002380 blocks = IneligibleHostQueue.fetch(
2381 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2382 for block in blocks:
2383 block.delete()
mblighe2586682008-02-29 22:45:46 +00002384
2385
showard2bab8f42008-11-12 18:15:22 +00002386 def set_execution_subdir(self, subdir=None):
2387 if subdir is None:
2388 assert self.get_host()
2389 subdir = self.get_host().hostname
2390 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002391
2392
showard6355f6b2008-12-05 18:52:13 +00002393 def _get_hostname(self):
2394 if self.host:
2395 return self.host.hostname
2396 return 'no host'
2397
2398
showard170873e2009-01-07 00:22:26 +00002399 def __str__(self):
2400 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2401
2402
jadmanski0afbb632008-06-06 21:10:57 +00002403 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002404 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002405
showardb18134f2009-03-20 20:52:18 +00002406 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002407
showardc85c21b2008-11-24 22:17:37 +00002408 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002409 self.update_field('complete', False)
2410 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002411
jadmanski0afbb632008-06-06 21:10:57 +00002412 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002413 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002414 self.update_field('complete', False)
2415 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002416
showardc85c21b2008-11-24 22:17:37 +00002417 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002418 self.update_field('complete', True)
2419 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002420
2421 should_email_status = (status.lower() in _notify_email_statuses or
2422 'all' in _notify_email_statuses)
2423 if should_email_status:
2424 self._email_on_status(status)
2425
2426 self._email_on_job_complete()
2427
2428
2429 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002430 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002431
2432 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2433 self.job.id, self.job.name, hostname, status)
2434 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2435 self.job.id, self.job.name, hostname, status,
2436 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002437 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002438
2439
2440 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002441 if not self.job.is_finished():
2442 return
showard542e8402008-09-19 20:16:18 +00002443
showardc85c21b2008-11-24 22:17:37 +00002444 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002445 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002446 for queue_entry in hosts_queue:
2447 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002448 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002449 queue_entry.status))
2450
2451 summary_text = "\n".join(summary_text)
2452 status_counts = models.Job.objects.get_status_counts(
2453 [self.job.id])[self.job.id]
2454 status = ', '.join('%d %s' % (count, status) for status, count
2455 in status_counts.iteritems())
2456
2457 subject = 'Autotest: Job ID: %s "%s" %s' % (
2458 self.job.id, self.job.name, status)
2459 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2460 self.job.id, self.job.name, status, self._view_job_url(),
2461 summary_text)
showard170873e2009-01-07 00:22:26 +00002462 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002463
2464
showard89f84db2009-03-12 20:39:13 +00002465 def run(self, assigned_host=None):
2466 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002467 assert assigned_host
2468 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002469 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002470
showardb18134f2009-03-20 20:52:18 +00002471 logging.info("%s/%s/%s scheduled on %s, status=%s",
2472 self.job.name, self.meta_host, self.atomic_group_id,
2473 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002474
jadmanski0afbb632008-06-06 21:10:57 +00002475 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002476
showard6ae5ea92009-02-25 00:11:51 +00002477
jadmanski0afbb632008-06-06 21:10:57 +00002478 def requeue(self):
2479 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002480 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002481 # verify/cleanup failure sets the execution subdir, so reset it here
2482 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002483 if self.meta_host:
2484 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002485
2486
jadmanski0afbb632008-06-06 21:10:57 +00002487 def handle_host_failure(self):
2488 """\
2489 Called when this queue entry's host has failed verification and
2490 repair.
2491 """
2492 assert not self.meta_host
2493 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002494 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002495
2496
jadmanskif7fa2cc2008-10-01 14:13:23 +00002497 @property
2498 def aborted_by(self):
2499 self._load_abort_info()
2500 return self._aborted_by
2501
2502
2503 @property
2504 def aborted_on(self):
2505 self._load_abort_info()
2506 return self._aborted_on
2507
2508
2509 def _load_abort_info(self):
2510 """ Fetch info about who aborted the job. """
2511 if hasattr(self, "_aborted_by"):
2512 return
2513 rows = _db.execute("""
2514 SELECT users.login, aborted_host_queue_entries.aborted_on
2515 FROM aborted_host_queue_entries
2516 INNER JOIN users
2517 ON users.id = aborted_host_queue_entries.aborted_by_id
2518 WHERE aborted_host_queue_entries.queue_entry_id = %s
2519 """, (self.id,))
2520 if rows:
2521 self._aborted_by, self._aborted_on = rows[0]
2522 else:
2523 self._aborted_by = self._aborted_on = None
2524
2525
showardb2e2c322008-10-14 17:33:55 +00002526 def on_pending(self):
2527 """
2528 Called when an entry in a synchronous job has passed verify. If the
2529 job is ready to run, returns an agent to run the job. Returns None
2530 otherwise.
2531 """
2532 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002533 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002534 if self.job.is_ready():
2535 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002536 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002537 return None
2538
2539
showardd3dc1992009-04-22 21:01:40 +00002540 def abort(self, dispatcher):
2541 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002542
showardd3dc1992009-04-22 21:01:40 +00002543 Status = models.HostQueueEntry.Status
2544 has_running_job_agent = (
2545 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2546 and dispatcher.get_agents_for_entry(self))
2547 if has_running_job_agent:
2548 # do nothing; post-job tasks will finish and then mark this entry
2549 # with status "Aborted" and take care of the host
2550 return
2551
2552 if self.status in (Status.STARTING, Status.PENDING):
2553 self.host.set_status(models.Host.Status.READY)
2554 elif self.status == Status.VERIFYING:
2555 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2556
2557 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002558
2559 def execution_tag(self):
2560 assert self.execution_subdir
2561 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002562
2563
mbligh36768f02008-02-22 18:28:33 +00002564class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002565 _table_name = 'jobs'
2566 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2567 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002568 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002569 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002570
2571
showarda3c58572009-03-12 20:36:59 +00002572 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002573 assert id or row
showarda3c58572009-03-12 20:36:59 +00002574 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002575
mblighe2586682008-02-29 22:45:46 +00002576
jadmanski0afbb632008-06-06 21:10:57 +00002577 def is_server_job(self):
2578 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002579
2580
showard170873e2009-01-07 00:22:26 +00002581 def tag(self):
2582 return "%s-%s" % (self.id, self.owner)
2583
2584
jadmanski0afbb632008-06-06 21:10:57 +00002585 def get_host_queue_entries(self):
2586 rows = _db.execute("""
2587 SELECT * FROM host_queue_entries
2588 WHERE job_id= %s
2589 """, (self.id,))
2590 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002591
jadmanski0afbb632008-06-06 21:10:57 +00002592 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002593
jadmanski0afbb632008-06-06 21:10:57 +00002594 return entries
mbligh36768f02008-02-22 18:28:33 +00002595
2596
jadmanski0afbb632008-06-06 21:10:57 +00002597 def set_status(self, status, update_queues=False):
2598 self.update_field('status',status)
2599
2600 if update_queues:
2601 for queue_entry in self.get_host_queue_entries():
2602 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002603
2604
jadmanski0afbb632008-06-06 21:10:57 +00002605 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002606 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2607 status='Pending')
2608 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002609
2610
jadmanski0afbb632008-06-06 21:10:57 +00002611 def num_machines(self, clause = None):
2612 sql = "job_id=%s" % self.id
2613 if clause:
2614 sql += " AND (%s)" % clause
2615 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002616
2617
jadmanski0afbb632008-06-06 21:10:57 +00002618 def num_queued(self):
2619 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002620
2621
jadmanski0afbb632008-06-06 21:10:57 +00002622 def num_active(self):
2623 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002624
2625
jadmanski0afbb632008-06-06 21:10:57 +00002626 def num_complete(self):
2627 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002628
2629
jadmanski0afbb632008-06-06 21:10:57 +00002630 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002631 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002632
mbligh36768f02008-02-22 18:28:33 +00002633
showard6bb7c292009-01-30 01:44:51 +00002634 def _not_yet_run_entries(self, include_verifying=True):
2635 statuses = [models.HostQueueEntry.Status.QUEUED,
2636 models.HostQueueEntry.Status.PENDING]
2637 if include_verifying:
2638 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2639 return models.HostQueueEntry.objects.filter(job=self.id,
2640 status__in=statuses)
2641
2642
2643 def _stop_all_entries(self):
2644 entries_to_stop = self._not_yet_run_entries(
2645 include_verifying=False)
2646 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002647 assert not child_entry.complete, (
2648 '%s status=%s, active=%s, complete=%s' %
2649 (child_entry.id, child_entry.status, child_entry.active,
2650 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002651 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2652 child_entry.host.status = models.Host.Status.READY
2653 child_entry.host.save()
2654 child_entry.status = models.HostQueueEntry.Status.STOPPED
2655 child_entry.save()
2656
showard2bab8f42008-11-12 18:15:22 +00002657 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002658 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002659 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002660 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002661
2662
jadmanski0afbb632008-06-06 21:10:57 +00002663 def write_to_machines_file(self, queue_entry):
2664 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002665 file_path = os.path.join(self.tag(), '.machines')
2666 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002667
2668
showardf1ae3542009-05-11 19:26:02 +00002669 def _next_group_name(self, group_name=''):
2670 """@returns a directory name to use for the next host group results."""
2671 if group_name:
2672 # Sanitize for use as a pathname.
2673 group_name = group_name.replace(os.path.sep, '_')
2674 if group_name.startswith('.'):
2675 group_name = '_' + group_name[1:]
2676 # Add a separator between the group name and 'group%d'.
2677 group_name += '.'
2678 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002679 query = models.HostQueueEntry.objects.filter(
2680 job=self.id).values('execution_subdir').distinct()
2681 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002682 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2683 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002684 if ids:
2685 next_id = max(ids) + 1
2686 else:
2687 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002688 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002689
2690
showard170873e2009-01-07 00:22:26 +00002691 def _write_control_file(self, execution_tag):
2692 control_path = _drone_manager.attach_file_to_execution(
2693 execution_tag, self.control_file)
2694 return control_path
mbligh36768f02008-02-22 18:28:33 +00002695
showardb2e2c322008-10-14 17:33:55 +00002696
showard2bab8f42008-11-12 18:15:22 +00002697 def get_group_entries(self, queue_entry_from_group):
2698 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002699 return list(HostQueueEntry.fetch(
2700 where='job_id=%s AND execution_subdir=%s',
2701 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002702
2703
showardb2e2c322008-10-14 17:33:55 +00002704 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002705 assert queue_entries
2706 execution_tag = queue_entries[0].execution_tag()
2707 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002708 hostnames = ','.join([entry.get_host().hostname
2709 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002710
showard87ba02a2009-04-20 19:37:32 +00002711 params = _autoserv_command_line(
2712 hostnames, execution_tag,
2713 ['-P', execution_tag, '-n',
2714 _drone_manager.absolute_path(control_path)],
2715 job=self)
mbligh36768f02008-02-22 18:28:33 +00002716
jadmanski0afbb632008-06-06 21:10:57 +00002717 if not self.is_server_job():
2718 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002719
showardb2e2c322008-10-14 17:33:55 +00002720 return params
mblighe2586682008-02-29 22:45:46 +00002721
mbligh36768f02008-02-22 18:28:33 +00002722
showardc9ae1782009-01-30 01:42:37 +00002723 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002724 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002725 return True
showard0fc38302008-10-23 00:44:07 +00002726 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002727 return queue_entry.get_host().dirty
2728 return False
showard21baa452008-10-21 00:08:39 +00002729
showardc9ae1782009-01-30 01:42:37 +00002730
2731 def _should_run_verify(self, queue_entry):
2732 do_not_verify = (queue_entry.host.protection ==
2733 host_protections.Protection.DO_NOT_VERIFY)
2734 if do_not_verify:
2735 return False
2736 return self.run_verify
2737
2738
2739 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002740 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002741 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002742 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002743 if self._should_run_verify(queue_entry):
2744 tasks.append(VerifyTask(queue_entry=queue_entry))
2745 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002746 return tasks
2747
2748
showardf1ae3542009-05-11 19:26:02 +00002749 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002750 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002751 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002752 else:
showardf1ae3542009-05-11 19:26:02 +00002753 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002754 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002755 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002756 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002757
2758 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002759 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002760
2761
2762 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002763 """
2764 @returns A tuple containing a list of HostQueueEntry instances to be
2765 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002766 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002767 """
2768 if include_queue_entry.atomic_group_id:
2769 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2770 always_query=False)
2771 else:
2772 atomic_group = None
2773
showard2bab8f42008-11-12 18:15:22 +00002774 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002775 if atomic_group:
2776 num_entries_wanted = atomic_group.max_number_of_machines
2777 else:
2778 num_entries_wanted = self.synch_count
2779 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002780
showardf1ae3542009-05-11 19:26:02 +00002781 if num_entries_wanted > 0:
2782 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002783 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002784 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002785 params=(self.id, include_queue_entry.id)))
2786
2787 # Sort the chosen hosts by hostname before slicing.
2788 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2789 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2790 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2791 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002792
showardf1ae3542009-05-11 19:26:02 +00002793 # Sanity check. We'll only ever be called if this can be met.
2794 assert len(chosen_entries) >= self.synch_count
2795
2796 if atomic_group:
2797 # Look at any meta_host and dependency labels and pick the first
2798 # one that also specifies this atomic group. Use that label name
2799 # as the group name if possible (it is more specific).
2800 group_name = atomic_group.name
2801 for label in include_queue_entry.get_labels():
2802 if label.atomic_group_id:
2803 assert label.atomic_group_id == atomic_group.id
2804 group_name = label.name
2805 break
2806 else:
2807 group_name = ''
2808
2809 self._assign_new_group(chosen_entries, group_name=group_name)
2810 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002811
2812
2813 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002814 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002815 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2816 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002817
showardf1ae3542009-05-11 19:26:02 +00002818 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2819 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002820
2821
showardf1ae3542009-05-11 19:26:02 +00002822 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002823 for queue_entry in queue_entries:
2824 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002825 params = self._get_autoserv_params(queue_entries)
2826 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002827 cmd=params, group_name=group_name)
2828 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002829 entry_ids = [entry.id for entry in queue_entries]
2830
showard170873e2009-01-07 00:22:26 +00002831 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002832
2833
mbligh36768f02008-02-22 18:28:33 +00002834if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002835 main()