blob: bee86e2206aa59b2cecc3b3e54b49d93d1798e4b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000017from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
showardb18134f2009-03-20 20:52:18 +000062# load the logging settings
63scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000064if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
65 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000066# Here we export the log name, using the same convention as autoserv's results
67# directory.
mblighc9895aa2009-04-01 18:36:58 +000068if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
69 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
70else:
71 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
72 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
73
showardb18134f2009-03-20 20:52:18 +000074logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
75
mbligh36768f02008-02-22 18:28:33 +000076
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
mbligh36768f02008-02-22 18:28:33 +000081def main():
showard27f33872009-04-07 18:20:53 +000082 try:
83 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000084 except SystemExit:
85 raise
showard27f33872009-04-07 18:20:53 +000086 except:
87 logging.exception('Exception escaping in monitor_db')
88 raise
89
90
91def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000092 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000093
jadmanski0afbb632008-06-06 21:10:57 +000094 parser = optparse.OptionParser(usage)
95 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
96 action='store_true')
97 parser.add_option('--logfile', help='Set a log file that all stdout ' +
98 'should be redirected to. Stderr will go to this ' +
99 'file + ".err"')
100 parser.add_option('--test', help='Indicate that scheduler is under ' +
101 'test and should use dummy autoserv and no parsing',
102 action='store_true')
103 (options, args) = parser.parse_args()
104 if len(args) != 1:
105 parser.print_usage()
106 return
mbligh36768f02008-02-22 18:28:33 +0000107
showard5613c662009-06-08 23:30:33 +0000108 scheduler_enabled = global_config.global_config.get_config_value(
109 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
110
111 if not scheduler_enabled:
112 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
113 "global_config's SCHEDULER section to enabled it. Exiting.")
114 print msg
115 sys.exit(1)
116
jadmanski0afbb632008-06-06 21:10:57 +0000117 global RESULTS_DIR
118 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000119
mbligh83c1e9e2009-05-01 23:10:41 +0000120 site_init = utils.import_site_function(__file__,
121 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
122 _site_init_monitor_db_dummy)
123 site_init()
124
showardcca334f2009-03-12 20:38:34 +0000125 # Change the cwd while running to avoid issues incase we were launched from
126 # somewhere odd (such as a random NFS home directory of the person running
127 # sudo to launch us as the appropriate user).
128 os.chdir(RESULTS_DIR)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000131 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
132 "notify_email_statuses",
133 default='')
showardc85c21b2008-11-24 22:17:37 +0000134 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000135 _notify_email_statuses = [status for status in
136 re.split(r'[\s,;:]', notify_statuses_list.lower())
137 if status]
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
mbligh37eceaa2008-12-15 22:56:37 +0000145 # AUTOTEST_WEB.base_url is still a supported config option as some people
146 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000147 global _base_url
showard170873e2009-01-07 00:22:26 +0000148 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
149 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000150 if config_base_url:
151 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000152 else:
mbligh37eceaa2008-12-15 22:56:37 +0000153 # For the common case of everything running on a single server you
154 # can just set the hostname in a single place in the config file.
155 server_name = c.get_config_value('SERVER', 'hostname')
156 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000157 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000158 sys.exit(1)
159 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000160
showardc5afc462009-01-13 00:09:39 +0000161 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000162 server.start()
163
jadmanski0afbb632008-06-06 21:10:57 +0000164 try:
showardc5afc462009-01-13 00:09:39 +0000165 init(options.logfile)
166 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000167 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 while not _shutdown:
170 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000171 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000172 except:
showard170873e2009-01-07 00:22:26 +0000173 email_manager.manager.log_stacktrace(
174 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000175
showard170873e2009-01-07 00:22:26 +0000176 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000177 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000178 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000179 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000180
181
182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
188def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000189 if logfile:
190 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
mblighfb676032009-04-01 18:25:38 +0000194 utils.write_pid("monitor_db")
195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000203 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
223def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000224 out_file = logfile
225 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000226 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000227 out_fd = open(out_file, "a", buffering=0)
228 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000229
jadmanski0afbb632008-06-06 21:10:57 +0000230 os.dup2(out_fd.fileno(), sys.stdout.fileno())
231 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000232
jadmanski0afbb632008-06-06 21:10:57 +0000233 sys.stdout = out_fd
234 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000235
236
showard87ba02a2009-04-20 19:37:32 +0000237def _autoserv_command_line(machines, results_dir, extra_args, job=None,
238 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000239 """
240 @returns The autoserv command line as a list of executable + parameters.
241
242 @param machines - string - A machine or comma separated list of machines
243 for the (-m) flag.
244 @param results_dir - string - Where the results will be written (-r).
245 @param extra_args - list - Additional arguments to pass to autoserv.
246 @param job - Job object - If supplied, -u owner and -l name parameters
247 will be added.
248 @param queue_entry - A HostQueueEntry object - If supplied and no Job
249 object was supplied, this will be used to lookup the Job object.
250 """
showard87ba02a2009-04-20 19:37:32 +0000251 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
252 '-r', _drone_manager.absolute_path(results_dir)]
253 if job or queue_entry:
254 if not job:
255 job = queue_entry.job
256 autoserv_argv += ['-u', job.owner, '-l', job.name]
257 return autoserv_argv + extra_args
258
259
showard89f84db2009-03-12 20:39:13 +0000260class SchedulerError(Exception):
261 """Raised by HostScheduler when an inconsistent state occurs."""
262
263
showard63a34772008-08-18 19:32:50 +0000264class HostScheduler(object):
265 def _get_ready_hosts(self):
266 # avoid any host with a currently active queue entry against it
267 hosts = Host.fetch(
268 joins='LEFT JOIN host_queue_entries AS active_hqe '
269 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000270 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000271 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000272 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000273 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
274 return dict((host.id, host) for host in hosts)
275
276
277 @staticmethod
278 def _get_sql_id_list(id_list):
279 return ','.join(str(item_id) for item_id in id_list)
280
281
282 @classmethod
showard989f25d2008-10-01 11:38:11 +0000283 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000284 if not id_list:
285 return {}
showard63a34772008-08-18 19:32:50 +0000286 query %= cls._get_sql_id_list(id_list)
287 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000288 return cls._process_many2many_dict(rows, flip)
289
290
291 @staticmethod
292 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000293 result = {}
294 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000295 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000296 if flip:
297 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000298 result.setdefault(left_id, set()).add(right_id)
299 return result
300
301
302 @classmethod
303 def _get_job_acl_groups(cls, job_ids):
304 query = """
showardd9ac4452009-02-07 02:04:37 +0000305 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000306 FROM jobs
307 INNER JOIN users ON users.login = jobs.owner
308 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
309 WHERE jobs.id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
315 def _get_job_ineligible_hosts(cls, job_ids):
316 query = """
317 SELECT job_id, host_id
318 FROM ineligible_host_queues
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard989f25d2008-10-01 11:38:11 +0000325 def _get_job_dependencies(cls, job_ids):
326 query = """
327 SELECT job_id, label_id
328 FROM jobs_dependency_labels
329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard63a34772008-08-18 19:32:50 +0000335 def _get_host_acls(cls, host_ids):
336 query = """
showardd9ac4452009-02-07 02:04:37 +0000337 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000338 FROM acl_groups_hosts
339 WHERE host_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, host_ids)
342
343
344 @classmethod
345 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000346 if not host_ids:
347 return {}, {}
showard63a34772008-08-18 19:32:50 +0000348 query = """
349 SELECT label_id, host_id
350 FROM hosts_labels
351 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000352 """ % cls._get_sql_id_list(host_ids)
353 rows = _db.execute(query)
354 labels_to_hosts = cls._process_many2many_dict(rows)
355 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
356 return labels_to_hosts, hosts_to_labels
357
358
359 @classmethod
360 def _get_labels(cls):
361 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000362
363
364 def refresh(self, pending_queue_entries):
365 self._hosts_available = self._get_ready_hosts()
366
367 relevant_jobs = [queue_entry.job_id
368 for queue_entry in pending_queue_entries]
369 self._job_acls = self._get_job_acl_groups(relevant_jobs)
370 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000371 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000372
373 host_ids = self._hosts_available.keys()
374 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000375 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
376
377 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def _is_acl_accessible(self, host_id, queue_entry):
381 job_acls = self._job_acls.get(queue_entry.job_id, set())
382 host_acls = self._host_acls.get(host_id, set())
383 return len(host_acls.intersection(job_acls)) > 0
384
385
showard989f25d2008-10-01 11:38:11 +0000386 def _check_job_dependencies(self, job_dependencies, host_labels):
387 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000388 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000389
390
391 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
392 queue_entry):
showardade14e22009-01-26 22:38:32 +0000393 if not queue_entry.meta_host:
394 # bypass only_if_needed labels when a specific host is selected
395 return True
396
showard989f25d2008-10-01 11:38:11 +0000397 for label_id in host_labels:
398 label = self._labels[label_id]
399 if not label.only_if_needed:
400 # we don't care about non-only_if_needed labels
401 continue
402 if queue_entry.meta_host == label_id:
403 # if the label was requested in a metahost it's OK
404 continue
405 if label_id not in job_dependencies:
406 return False
407 return True
408
409
showard89f84db2009-03-12 20:39:13 +0000410 def _check_atomic_group_labels(self, host_labels, queue_entry):
411 """
412 Determine if the given HostQueueEntry's atomic group settings are okay
413 to schedule on a host with the given labels.
414
415 @param host_labels - A list of label ids that the host has.
416 @param queue_entry - The HostQueueEntry being considered for the host.
417
418 @returns True if atomic group settings are okay, False otherwise.
419 """
420 return (self._get_host_atomic_group_id(host_labels) ==
421 queue_entry.atomic_group_id)
422
423
424 def _get_host_atomic_group_id(self, host_labels):
425 """
426 Return the atomic group label id for a host with the given set of
427 labels if any, or None otherwise. Raises an exception if more than
428 one atomic group are found in the set of labels.
429
430 @param host_labels - A list of label ids that the host has.
431
432 @returns The id of the atomic group found on a label in host_labels
433 or None if no atomic group label is found.
434 @raises SchedulerError - If more than one atomic group label is found.
435 """
436 atomic_ids = [self._labels[label_id].atomic_group_id
437 for label_id in host_labels
438 if self._labels[label_id].atomic_group_id is not None]
439 if not atomic_ids:
440 return None
441 if len(atomic_ids) > 1:
442 raise SchedulerError('More than one atomic label on host.')
443 return atomic_ids[0]
444
445
446 def _get_atomic_group_labels(self, atomic_group_id):
447 """
448 Lookup the label ids that an atomic_group is associated with.
449
450 @param atomic_group_id - The id of the AtomicGroup to look up.
451
452 @returns A generator yeilding Label ids for this atomic group.
453 """
454 return (id for id, label in self._labels.iteritems()
455 if label.atomic_group_id == atomic_group_id
456 and not label.invalid)
457
458
showard54c1ea92009-05-20 00:32:58 +0000459 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000460 """
461 @param group_hosts - A sequence of Host ids to test for usability
462 and eligibility against the Job associated with queue_entry.
463 @param queue_entry - The HostQueueEntry that these hosts are being
464 tested for eligibility against.
465
466 @returns A subset of group_hosts Host ids that are eligible for the
467 supplied queue_entry.
468 """
469 return set(host_id for host_id in group_hosts
470 if self._is_host_usable(host_id)
471 and self._is_host_eligible_for_job(host_id, queue_entry))
472
473
showard989f25d2008-10-01 11:38:11 +0000474 def _is_host_eligible_for_job(self, host_id, queue_entry):
475 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
476 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000477
showard89f84db2009-03-12 20:39:13 +0000478 return (self._is_acl_accessible(host_id, queue_entry) and
479 self._check_job_dependencies(job_dependencies, host_labels) and
480 self._check_only_if_needed_labels(
481 job_dependencies, host_labels, queue_entry) and
482 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000483
484
showard63a34772008-08-18 19:32:50 +0000485 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000486 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000487 return None
488 return self._hosts_available.pop(queue_entry.host_id, None)
489
490
491 def _is_host_usable(self, host_id):
492 if host_id not in self._hosts_available:
493 # host was already used during this scheduling cycle
494 return False
495 if self._hosts_available[host_id].invalid:
496 # Invalid hosts cannot be used for metahosts. They're included in
497 # the original query because they can be used by non-metahosts.
498 return False
499 return True
500
501
502 def _schedule_metahost(self, queue_entry):
503 label_id = queue_entry.meta_host
504 hosts_in_label = self._label_hosts.get(label_id, set())
505 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
506 set())
507
508 # must iterate over a copy so we can mutate the original while iterating
509 for host_id in list(hosts_in_label):
510 if not self._is_host_usable(host_id):
511 hosts_in_label.remove(host_id)
512 continue
513 if host_id in ineligible_host_ids:
514 continue
showard989f25d2008-10-01 11:38:11 +0000515 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000516 continue
517
showard89f84db2009-03-12 20:39:13 +0000518 # Remove the host from our cached internal state before returning
519 # the host object.
showard63a34772008-08-18 19:32:50 +0000520 hosts_in_label.remove(host_id)
521 return self._hosts_available.pop(host_id)
522 return None
523
524
525 def find_eligible_host(self, queue_entry):
526 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000527 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000528 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000529 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000530 return self._schedule_metahost(queue_entry)
531
532
showard89f84db2009-03-12 20:39:13 +0000533 def find_eligible_atomic_group(self, queue_entry):
534 """
535 Given an atomic group host queue entry, locate an appropriate group
536 of hosts for the associated job to run on.
537
538 The caller is responsible for creating new HQEs for the additional
539 hosts returned in order to run the actual job on them.
540
541 @returns A list of Host instances in a ready state to satisfy this
542 atomic group scheduling. Hosts will all belong to the same
543 atomic group label as specified by the queue_entry.
544 An empty list will be returned if no suitable atomic
545 group could be found.
546
547 TODO(gps): what is responsible for kicking off any attempted repairs on
548 a group of hosts? not this function, but something needs to. We do
549 not communicate that reason for returning [] outside of here...
550 For now, we'll just be unschedulable if enough hosts within one group
551 enter Repair Failed state.
552 """
553 assert queue_entry.atomic_group_id is not None
554 job = queue_entry.job
555 assert job.synch_count and job.synch_count > 0
556 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
557 if job.synch_count > atomic_group.max_number_of_machines:
558 # Such a Job and HostQueueEntry should never be possible to
559 # create using the frontend. Regardless, we can't process it.
560 # Abort it immediately and log an error on the scheduler.
561 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000562 logging.error(
563 'Error: job %d synch_count=%d > requested atomic_group %d '
564 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
565 job.id, job.synch_count, atomic_group.id,
566 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000567 return []
568 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
569 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
570 set())
571
572 # Look in each label associated with atomic_group until we find one with
573 # enough hosts to satisfy the job.
574 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
575 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
576 if queue_entry.meta_host is not None:
577 # If we have a metahost label, only allow its hosts.
578 group_hosts.intersection_update(hosts_in_label)
579 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000580 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000581 group_hosts, queue_entry)
582
583 # Job.synch_count is treated as "minimum synch count" when
584 # scheduling for an atomic group of hosts. The atomic group
585 # number of machines is the maximum to pick out of a single
586 # atomic group label for scheduling at one time.
587 min_hosts = job.synch_count
588 max_hosts = atomic_group.max_number_of_machines
589
showard54c1ea92009-05-20 00:32:58 +0000590 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000591 # Not enough eligible hosts in this atomic group label.
592 continue
593
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_hosts_in_group = [self._hosts_available[id]
595 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000596 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000597 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000598
showard89f84db2009-03-12 20:39:13 +0000599 # Limit ourselves to scheduling the atomic group size.
600 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000601 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000602
603 # Remove the selected hosts from our cached internal state
604 # of available hosts in order to return the Host objects.
605 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000606 for host in eligible_hosts_in_group:
607 hosts_in_label.discard(host.id)
608 self._hosts_available.pop(host.id)
609 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000610 return host_list
611
612 return []
613
614
showard170873e2009-01-07 00:22:26 +0000615class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000616 def __init__(self):
617 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000618 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000619 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000620 user_cleanup_time = scheduler_config.config.clean_interval
621 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
622 _db, user_cleanup_time)
623 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000624 self._host_agents = {}
625 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard915958d2009-04-22 21:00:58 +0000628 def initialize(self, recover_hosts=True):
629 self._periodic_cleanup.initialize()
630 self._24hr_upkeep.initialize()
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 # always recover processes
633 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000634
jadmanski0afbb632008-06-06 21:10:57 +0000635 if recover_hosts:
636 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def tick(self):
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000641 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000642 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000643 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000644 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000645 self._schedule_new_jobs()
646 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000647 _drone_manager.execute_actions()
648 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000649
showard97aed502008-11-04 02:01:24 +0000650
mblighf3294cc2009-04-08 21:17:38 +0000651 def _run_cleanup(self):
652 self._periodic_cleanup.run_cleanup_maybe()
653 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000654
mbligh36768f02008-02-22 18:28:33 +0000655
showard170873e2009-01-07 00:22:26 +0000656 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
657 for object_id in object_ids:
658 agent_dict.setdefault(object_id, set()).add(agent)
659
660
661 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
662 for object_id in object_ids:
663 assert object_id in agent_dict
664 agent_dict[object_id].remove(agent)
665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def add_agent(self, agent):
668 self._agents.append(agent)
669 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000670 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
671 self._register_agent_for_ids(self._queue_entry_agents,
672 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000673
showard170873e2009-01-07 00:22:26 +0000674
675 def get_agents_for_entry(self, queue_entry):
676 """
677 Find agents corresponding to the specified queue_entry.
678 """
showardd3dc1992009-04-22 21:01:40 +0000679 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000680
681
682 def host_has_agent(self, host):
683 """
684 Determine if there is currently an Agent present using this host.
685 """
686 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def remove_agent(self, agent):
690 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000691 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
692 agent)
693 self._unregister_agent_for_ids(self._queue_entry_agents,
694 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000695
696
jadmanski0afbb632008-06-06 21:10:57 +0000697 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000698 self._register_pidfiles()
699 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000700 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000701 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000702 self._reverify_remaining_hosts()
703 # reinitialize drones after killing orphaned processes, since they can
704 # leave around files when they die
705 _drone_manager.execute_actions()
706 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000707
showard170873e2009-01-07 00:22:26 +0000708
709 def _register_pidfiles(self):
710 # during recovery we may need to read pidfiles for both running and
711 # parsing entries
712 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000713 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000714 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000715 for pidfile_name in _ALL_PIDFILE_NAMES:
716 pidfile_id = _drone_manager.get_pidfile_id_from(
717 queue_entry.execution_tag(), pidfile_name=pidfile_name)
718 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000719
720
showardd3dc1992009-04-22 21:01:40 +0000721 def _recover_entries_with_status(self, status, orphans, pidfile_name,
722 recover_entries_fn):
723 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000724 for queue_entry in queue_entries:
725 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000726 # synchronous job we've already recovered
727 continue
showardd3dc1992009-04-22 21:01:40 +0000728 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000729 execution_tag = queue_entry.execution_tag()
730 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000731 run_monitor.attach_to_existing_process(execution_tag,
732 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000733
734 log_message = ('Recovering %s entry %s ' %
735 (status.lower(),
736 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000737 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000738 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000739 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000740 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000741 continue
mbligh90a549d2008-03-25 23:52:34 +0000742
showard597bfd32009-05-08 18:22:50 +0000743 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000744 run_monitor.get_process())
745 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
746 orphans.discard(run_monitor.get_process())
747
748
749 def _kill_remaining_orphan_processes(self, orphans):
750 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000751 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000752 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000753
showard170873e2009-01-07 00:22:26 +0000754
showardd3dc1992009-04-22 21:01:40 +0000755 def _recover_running_entries(self, orphans):
756 def recover_entries(job, queue_entries, run_monitor):
757 if run_monitor is not None:
758 queue_task = RecoveryQueueTask(job=job,
759 queue_entries=queue_entries,
760 run_monitor=run_monitor)
761 self.add_agent(Agent(tasks=[queue_task],
762 num_processes=len(queue_entries)))
763 # else, _requeue_other_active_entries will cover this
764
765 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
766 orphans, '.autoserv_execute',
767 recover_entries)
768
769
770 def _recover_gathering_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 gather_task = GatherLogsTask(job, queue_entries,
773 run_monitor=run_monitor)
774 self.add_agent(Agent([gather_task]))
775
776 self._recover_entries_with_status(
777 models.HostQueueEntry.Status.GATHERING,
778 orphans, _CRASHINFO_PID_FILE, recover_entries)
779
780
781 def _recover_parsing_entries(self, orphans):
782 def recover_entries(job, queue_entries, run_monitor):
783 reparse_task = FinalReparseTask(queue_entries,
784 run_monitor=run_monitor)
785 self.add_agent(Agent([reparse_task], num_processes=0))
786
787 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
788 orphans, _PARSER_PID_FILE,
789 recover_entries)
790
791
792 def _recover_all_recoverable_entries(self):
793 orphans = _drone_manager.get_orphaned_autoserv_processes()
794 self._recover_running_entries(orphans)
795 self._recover_gathering_entries(orphans)
796 self._recover_parsing_entries(orphans)
797 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000798
showard97aed502008-11-04 02:01:24 +0000799
showard170873e2009-01-07 00:22:26 +0000800 def _requeue_other_active_entries(self):
801 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000802 where='active AND NOT complete AND '
803 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000804 for queue_entry in queue_entries:
805 if self.get_agents_for_entry(queue_entry):
806 # entry has already been recovered
807 continue
showardd3dc1992009-04-22 21:01:40 +0000808 if queue_entry.aborted:
809 queue_entry.abort(self)
810 continue
811
812 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000813 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000814 if queue_entry.host:
815 tasks = queue_entry.host.reverify_tasks()
816 self.add_agent(Agent(tasks))
817 agent = queue_entry.requeue()
818
819
showard1ff7b2e2009-05-15 23:17:18 +0000820 def _find_reverify(self):
showarda64e52a2009-06-08 23:24:08 +0000821 self._reverify_hosts_where("status = 'Reverify'", cleanup=False)
showard1ff7b2e2009-05-15 23:17:18 +0000822
823
showard170873e2009-01-07 00:22:26 +0000824 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000825 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000826 self._reverify_hosts_where("""(status = 'Repairing' OR
827 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000828 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000829
showard170873e2009-01-07 00:22:26 +0000830 # recover "Running" hosts with no active queue entries, although this
831 # should never happen
832 message = ('Recovering running host %s - this probably indicates a '
833 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000834 self._reverify_hosts_where("""status = 'Running' AND
835 id NOT IN (SELECT host_id
836 FROM host_queue_entries
837 WHERE active)""",
838 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000839
840
jadmanski0afbb632008-06-06 21:10:57 +0000841 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000842 print_message='Reverifying host %s',
843 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000844 full_where='locked = 0 AND invalid = 0 AND ' + where
845 for host in Host.fetch(where=full_where):
846 if self.host_has_agent(host):
847 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000848 continue
showard170873e2009-01-07 00:22:26 +0000849 if print_message:
showardb18134f2009-03-20 20:52:18 +0000850 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000851 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000852 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000853
854
jadmanski0afbb632008-06-06 21:10:57 +0000855 def _recover_hosts(self):
856 # recover "Repair Failed" hosts
857 message = 'Reverifying dead host %s'
858 self._reverify_hosts_where("status = 'Repair Failed'",
859 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000860
861
showard04c82c52008-05-29 19:38:12 +0000862
showardb95b1bd2008-08-15 18:11:04 +0000863 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000864 # prioritize by job priority, then non-metahost over metahost, then FIFO
865 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000866 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000867 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000868 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000869
870
showard89f84db2009-03-12 20:39:13 +0000871 def _refresh_pending_queue_entries(self):
872 """
873 Lookup the pending HostQueueEntries and call our HostScheduler
874 refresh() method given that list. Return the list.
875
876 @returns A list of pending HostQueueEntries sorted in priority order.
877 """
showard63a34772008-08-18 19:32:50 +0000878 queue_entries = self._get_pending_queue_entries()
879 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000880 return []
showardb95b1bd2008-08-15 18:11:04 +0000881
showard63a34772008-08-18 19:32:50 +0000882 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000883
showard89f84db2009-03-12 20:39:13 +0000884 return queue_entries
885
886
887 def _schedule_atomic_group(self, queue_entry):
888 """
889 Schedule the given queue_entry on an atomic group of hosts.
890
891 Returns immediately if there are insufficient available hosts.
892
893 Creates new HostQueueEntries based off of queue_entry for the
894 scheduled hosts and starts them all running.
895 """
896 # This is a virtual host queue entry representing an entire
897 # atomic group, find a group and schedule their hosts.
898 group_hosts = self._host_scheduler.find_eligible_atomic_group(
899 queue_entry)
900 if not group_hosts:
901 return
902 # The first assigned host uses the original HostQueueEntry
903 group_queue_entries = [queue_entry]
904 for assigned_host in group_hosts[1:]:
905 # Create a new HQE for every additional assigned_host.
906 new_hqe = HostQueueEntry.clone(queue_entry)
907 new_hqe.save()
908 group_queue_entries.append(new_hqe)
909 assert len(group_queue_entries) == len(group_hosts)
910 for queue_entry, host in itertools.izip(group_queue_entries,
911 group_hosts):
912 self._run_queue_entry(queue_entry, host)
913
914
915 def _schedule_new_jobs(self):
916 queue_entries = self._refresh_pending_queue_entries()
917 if not queue_entries:
918 return
919
showard63a34772008-08-18 19:32:50 +0000920 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000921 if (queue_entry.atomic_group_id is None or
922 queue_entry.host_id is not None):
923 assigned_host = self._host_scheduler.find_eligible_host(
924 queue_entry)
925 if assigned_host:
926 self._run_queue_entry(queue_entry, assigned_host)
927 else:
928 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000929
930
931 def _run_queue_entry(self, queue_entry, host):
932 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000933 # in some cases (synchronous jobs with run_verify=False), agent may be
934 # None
showard9976ce92008-10-15 20:28:13 +0000935 if agent:
936 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000937
938
jadmanski0afbb632008-06-06 21:10:57 +0000939 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000940 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
941 for agent in self.get_agents_for_entry(entry):
942 agent.abort()
943 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000944
945
showard324bf812009-01-20 23:23:38 +0000946 def _can_start_agent(self, agent, num_started_this_cycle,
947 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000948 # always allow zero-process agents to run
949 if agent.num_processes == 0:
950 return True
951 # don't allow any nonzero-process agents to run after we've reached a
952 # limit (this avoids starvation of many-process agents)
953 if have_reached_limit:
954 return False
955 # total process throttling
showard324bf812009-01-20 23:23:38 +0000956 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000957 return False
958 # if a single agent exceeds the per-cycle throttling, still allow it to
959 # run when it's the first agent in the cycle
960 if num_started_this_cycle == 0:
961 return True
962 # per-cycle throttling
963 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000964 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000965 return False
966 return True
967
968
jadmanski0afbb632008-06-06 21:10:57 +0000969 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000970 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000971 have_reached_limit = False
972 # iterate over copy, so we can remove agents during iteration
973 for agent in list(self._agents):
974 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000975 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000976 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000977 continue
978 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000979 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000980 have_reached_limit):
981 have_reached_limit = True
982 continue
showard4c5374f2008-09-04 17:02:56 +0000983 num_started_this_cycle += agent.num_processes
984 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000985 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000986 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000987
988
showard29f7cd22009-04-29 21:16:24 +0000989 def _process_recurring_runs(self):
990 recurring_runs = models.RecurringRun.objects.filter(
991 start_date__lte=datetime.datetime.now())
992 for rrun in recurring_runs:
993 # Create job from template
994 job = rrun.job
995 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000996 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000997
998 host_objects = info['hosts']
999 one_time_hosts = info['one_time_hosts']
1000 metahost_objects = info['meta_hosts']
1001 dependencies = info['dependencies']
1002 atomic_group = info['atomic_group']
1003
1004 for host in one_time_hosts or []:
1005 this_host = models.Host.create_one_time_host(host.hostname)
1006 host_objects.append(this_host)
1007
1008 try:
1009 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001010 options=options,
showard29f7cd22009-04-29 21:16:24 +00001011 host_objects=host_objects,
1012 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001013 atomic_group=atomic_group)
1014
1015 except Exception, ex:
1016 logging.exception(ex)
1017 #TODO send email
1018
1019 if rrun.loop_count == 1:
1020 rrun.delete()
1021 else:
1022 if rrun.loop_count != 0: # if not infinite loop
1023 # calculate new start_date
1024 difference = datetime.timedelta(seconds=rrun.loop_period)
1025 rrun.start_date = rrun.start_date + difference
1026 rrun.loop_count -= 1
1027 rrun.save()
1028
1029
showard170873e2009-01-07 00:22:26 +00001030class PidfileRunMonitor(object):
1031 """
1032 Client must call either run() to start a new process or
1033 attach_to_existing_process().
1034 """
mbligh36768f02008-02-22 18:28:33 +00001035
showard170873e2009-01-07 00:22:26 +00001036 class _PidfileException(Exception):
1037 """
1038 Raised when there's some unexpected behavior with the pid file, but only
1039 used internally (never allowed to escape this class).
1040 """
mbligh36768f02008-02-22 18:28:33 +00001041
1042
showard170873e2009-01-07 00:22:26 +00001043 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001044 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001045 self._start_time = None
1046 self.pidfile_id = None
1047 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001048
1049
showard170873e2009-01-07 00:22:26 +00001050 def _add_nice_command(self, command, nice_level):
1051 if not nice_level:
1052 return command
1053 return ['nice', '-n', str(nice_level)] + command
1054
1055
1056 def _set_start_time(self):
1057 self._start_time = time.time()
1058
1059
1060 def run(self, command, working_directory, nice_level=None, log_file=None,
1061 pidfile_name=None, paired_with_pidfile=None):
1062 assert command is not None
1063 if nice_level is not None:
1064 command = ['nice', '-n', str(nice_level)] + command
1065 self._set_start_time()
1066 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001067 command, working_directory, pidfile_name=pidfile_name,
1068 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001069
1070
showardd3dc1992009-04-22 21:01:40 +00001071 def attach_to_existing_process(self, execution_tag,
1072 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001073 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001074 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1075 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001076 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001077
1078
jadmanski0afbb632008-06-06 21:10:57 +00001079 def kill(self):
showard170873e2009-01-07 00:22:26 +00001080 if self.has_process():
1081 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001082
mbligh36768f02008-02-22 18:28:33 +00001083
showard170873e2009-01-07 00:22:26 +00001084 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001085 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001086 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001087
1088
showard170873e2009-01-07 00:22:26 +00001089 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001090 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001091 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001092 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001093
1094
showard170873e2009-01-07 00:22:26 +00001095 def _read_pidfile(self, use_second_read=False):
1096 assert self.pidfile_id is not None, (
1097 'You must call run() or attach_to_existing_process()')
1098 contents = _drone_manager.get_pidfile_contents(
1099 self.pidfile_id, use_second_read=use_second_read)
1100 if contents.is_invalid():
1101 self._state = drone_manager.PidfileContents()
1102 raise self._PidfileException(contents)
1103 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001104
1105
showard21baa452008-10-21 00:08:39 +00001106 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001107 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1108 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001109 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001110 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001111 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001112
1113
1114 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001115 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001116 return
mblighbb421852008-03-11 22:36:16 +00001117
showard21baa452008-10-21 00:08:39 +00001118 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001119
showard170873e2009-01-07 00:22:26 +00001120 if self._state.process is None:
1121 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001122 return
mbligh90a549d2008-03-25 23:52:34 +00001123
showard21baa452008-10-21 00:08:39 +00001124 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001125 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001126 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001127 return
mbligh90a549d2008-03-25 23:52:34 +00001128
showard170873e2009-01-07 00:22:26 +00001129 # pid but no running process - maybe process *just* exited
1130 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001131 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001132 # autoserv exited without writing an exit code
1133 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001134 self._handle_pidfile_error(
1135 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001136
showard21baa452008-10-21 00:08:39 +00001137
1138 def _get_pidfile_info(self):
1139 """\
1140 After completion, self._state will contain:
1141 pid=None, exit_status=None if autoserv has not yet run
1142 pid!=None, exit_status=None if autoserv is running
1143 pid!=None, exit_status!=None if autoserv has completed
1144 """
1145 try:
1146 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001147 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001148 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001149
1150
showard170873e2009-01-07 00:22:26 +00001151 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001152 """\
1153 Called when no pidfile is found or no pid is in the pidfile.
1154 """
showard170873e2009-01-07 00:22:26 +00001155 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001156 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001157 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1158 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001159 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001160 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001161
1162
showard35162b02009-03-03 02:17:30 +00001163 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001164 """\
1165 Called when autoserv has exited without writing an exit status,
1166 or we've timed out waiting for autoserv to write a pid to the
1167 pidfile. In either case, we just return failure and the caller
1168 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001169
showard170873e2009-01-07 00:22:26 +00001170 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001171 """
1172 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001173 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001174 self._state.exit_status = 1
1175 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001176
1177
jadmanski0afbb632008-06-06 21:10:57 +00001178 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001179 self._get_pidfile_info()
1180 return self._state.exit_status
1181
1182
1183 def num_tests_failed(self):
1184 self._get_pidfile_info()
1185 assert self._state.num_tests_failed is not None
1186 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001187
1188
mbligh36768f02008-02-22 18:28:33 +00001189class Agent(object):
showard170873e2009-01-07 00:22:26 +00001190 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001191 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001192 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001193 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001194 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001195
showard170873e2009-01-07 00:22:26 +00001196 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1197 for task in tasks)
1198 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1199
showardd3dc1992009-04-22 21:01:40 +00001200 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001201 for task in tasks:
1202 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001203
1204
showardd3dc1992009-04-22 21:01:40 +00001205 def _clear_queue(self):
1206 self.queue = Queue.Queue(0)
1207
1208
showard170873e2009-01-07 00:22:26 +00001209 def _union_ids(self, id_lists):
1210 return set(itertools.chain(*id_lists))
1211
1212
jadmanski0afbb632008-06-06 21:10:57 +00001213 def add_task(self, task):
1214 self.queue.put_nowait(task)
1215 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def tick(self):
showard21baa452008-10-21 00:08:39 +00001219 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001220 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001221 self.active_task.poll()
1222 if not self.active_task.is_done():
1223 return
1224 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001225
1226
jadmanski0afbb632008-06-06 21:10:57 +00001227 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001228 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001229 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001230 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001231 if not self.active_task.success:
1232 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001233 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001234
jadmanski0afbb632008-06-06 21:10:57 +00001235 if not self.is_done():
1236 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001240 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001241 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1242 # get reset.
1243 new_agent = Agent(self.active_task.failure_tasks)
1244 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001245
mblighe2586682008-02-29 22:45:46 +00001246
showard4c5374f2008-09-04 17:02:56 +00001247 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001248 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001249
1250
jadmanski0afbb632008-06-06 21:10:57 +00001251 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001252 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001253
1254
showardd3dc1992009-04-22 21:01:40 +00001255 def abort(self):
showard08a36412009-05-05 01:01:13 +00001256 # abort tasks until the queue is empty or a task ignores the abort
1257 while not self.is_done():
1258 if not self.active_task:
1259 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001260 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001261 if not self.active_task.aborted:
1262 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001263 return
1264 self.active_task = None
1265
showardd3dc1992009-04-22 21:01:40 +00001266
mbligh36768f02008-02-22 18:28:33 +00001267class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001268 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1269 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001270 self.done = False
1271 self.failure_tasks = failure_tasks
1272 self.started = False
1273 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001274 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001275 self.task = None
1276 self.agent = None
1277 self.monitor = None
1278 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001279 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001280 self.queue_entry_ids = []
1281 self.host_ids = []
1282 self.log_file = None
1283
1284
1285 def _set_ids(self, host=None, queue_entries=None):
1286 if queue_entries and queue_entries != [None]:
1287 self.host_ids = [entry.host.id for entry in queue_entries]
1288 self.queue_entry_ids = [entry.id for entry in queue_entries]
1289 else:
1290 assert host
1291 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001292
1293
jadmanski0afbb632008-06-06 21:10:57 +00001294 def poll(self):
showard08a36412009-05-05 01:01:13 +00001295 if not self.started:
1296 self.start()
1297 self.tick()
1298
1299
1300 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001301 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001302 exit_code = self.monitor.exit_code()
1303 if exit_code is None:
1304 return
1305 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001306 else:
1307 success = False
mbligh36768f02008-02-22 18:28:33 +00001308
jadmanski0afbb632008-06-06 21:10:57 +00001309 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001310
1311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def is_done(self):
1313 return self.done
mbligh36768f02008-02-22 18:28:33 +00001314
1315
jadmanski0afbb632008-06-06 21:10:57 +00001316 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001317 if self.done:
1318 return
jadmanski0afbb632008-06-06 21:10:57 +00001319 self.done = True
1320 self.success = success
1321 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001322
1323
jadmanski0afbb632008-06-06 21:10:57 +00001324 def prolog(self):
1325 pass
mblighd64e5702008-04-04 21:39:28 +00001326
1327
jadmanski0afbb632008-06-06 21:10:57 +00001328 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001329 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001330
mbligh36768f02008-02-22 18:28:33 +00001331
jadmanski0afbb632008-06-06 21:10:57 +00001332 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001333 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001334 _drone_manager.copy_to_results_repository(
1335 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001336
1337
jadmanski0afbb632008-06-06 21:10:57 +00001338 def epilog(self):
1339 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001340
1341
jadmanski0afbb632008-06-06 21:10:57 +00001342 def start(self):
1343 assert self.agent
1344
1345 if not self.started:
1346 self.prolog()
1347 self.run()
1348
1349 self.started = True
1350
1351
1352 def abort(self):
1353 if self.monitor:
1354 self.monitor.kill()
1355 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001356 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001357 self.cleanup()
1358
1359
showard170873e2009-01-07 00:22:26 +00001360 def set_host_log_file(self, base_name, host):
1361 filename = '%s.%s' % (time.time(), base_name)
1362 self.log_file = os.path.join('hosts', host.hostname, filename)
1363
1364
showardde634ee2009-01-30 01:44:24 +00001365 def _get_consistent_execution_tag(self, queue_entries):
1366 first_execution_tag = queue_entries[0].execution_tag()
1367 for queue_entry in queue_entries[1:]:
1368 assert queue_entry.execution_tag() == first_execution_tag, (
1369 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1370 queue_entry,
1371 first_execution_tag,
1372 queue_entries[0]))
1373 return first_execution_tag
1374
1375
showarda1e74b32009-05-12 17:32:04 +00001376 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001377 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001378 if use_monitor is None:
1379 assert self.monitor
1380 use_monitor = self.monitor
1381 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001382 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001383 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001384 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001385 results_path)
showardde634ee2009-01-30 01:44:24 +00001386
showarda1e74b32009-05-12 17:32:04 +00001387
1388 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001389 reparse_task = FinalReparseTask(queue_entries)
1390 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1391
1392
showarda1e74b32009-05-12 17:32:04 +00001393 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1394 self._copy_results(queue_entries, use_monitor)
1395 self._parse_results(queue_entries)
1396
1397
showardd3dc1992009-04-22 21:01:40 +00001398 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001399 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001400 self.monitor = PidfileRunMonitor()
1401 self.monitor.run(self.cmd, self._working_directory,
1402 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001403 log_file=self.log_file,
1404 pidfile_name=pidfile_name,
1405 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001406
1407
showardd9205182009-04-27 20:09:55 +00001408class TaskWithJobKeyvals(object):
1409 """AgentTask mixin providing functionality to help with job keyval files."""
1410 _KEYVAL_FILE = 'keyval'
1411 def _format_keyval(self, key, value):
1412 return '%s=%s' % (key, value)
1413
1414
1415 def _keyval_path(self):
1416 """Subclasses must override this"""
1417 raise NotImplemented
1418
1419
1420 def _write_keyval_after_job(self, field, value):
1421 assert self.monitor
1422 if not self.monitor.has_process():
1423 return
1424 _drone_manager.write_lines_to_file(
1425 self._keyval_path(), [self._format_keyval(field, value)],
1426 paired_with_process=self.monitor.get_process())
1427
1428
1429 def _job_queued_keyval(self, job):
1430 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1431
1432
1433 def _write_job_finished(self):
1434 self._write_keyval_after_job("job_finished", int(time.time()))
1435
1436
1437class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001438 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001439 """\
showard170873e2009-01-07 00:22:26 +00001440 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001441 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001442 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001443 # normalize the protection name
1444 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001445
jadmanski0afbb632008-06-06 21:10:57 +00001446 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001447 self.queue_entry_to_fail = queue_entry
1448 # *don't* include the queue entry in IDs -- if the queue entry is
1449 # aborted, we want to leave the repair task running
1450 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001451
1452 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001453 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1454 ['-R', '--host-protection', protection],
1455 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001456 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1457
showard170873e2009-01-07 00:22:26 +00001458 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001459
mbligh36768f02008-02-22 18:28:33 +00001460
jadmanski0afbb632008-06-06 21:10:57 +00001461 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001462 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001463 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001464 if self.queue_entry_to_fail:
1465 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001466
1467
showardd9205182009-04-27 20:09:55 +00001468 def _keyval_path(self):
1469 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1470
1471
showardde634ee2009-01-30 01:44:24 +00001472 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001473 assert self.queue_entry_to_fail
1474
1475 if self.queue_entry_to_fail.meta_host:
1476 return # don't fail metahost entries, they'll be reassigned
1477
1478 self.queue_entry_to_fail.update_from_database()
1479 if self.queue_entry_to_fail.status != 'Queued':
1480 return # entry has been aborted
1481
1482 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001483 queued_key, queued_time = self._job_queued_keyval(
1484 self.queue_entry_to_fail.job)
1485 self._write_keyval_after_job(queued_key, queued_time)
1486 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001487 # copy results logs into the normal place for job results
1488 _drone_manager.copy_results_on_drone(
1489 self.monitor.get_process(),
1490 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001491 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001492
showarda1e74b32009-05-12 17:32:04 +00001493 self._copy_results([self.queue_entry_to_fail])
1494 if self.queue_entry_to_fail.job.parse_failed_repair:
1495 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001496 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001497
1498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def epilog(self):
1500 super(RepairTask, self).epilog()
1501 if self.success:
1502 self.host.set_status('Ready')
1503 else:
1504 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001505 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001506 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001507
1508
showard8fe93b52008-11-18 17:53:22 +00001509class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001510 def epilog(self):
1511 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001512 should_copy_results = (self.queue_entry and not self.success
1513 and not self.queue_entry.meta_host)
1514 if should_copy_results:
1515 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001516 destination = os.path.join(self.queue_entry.execution_tag(),
1517 os.path.basename(self.log_file))
1518 _drone_manager.copy_to_results_repository(
1519 self.monitor.get_process(), self.log_file,
1520 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001521
1522
1523class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001524 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001525 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.host = host or queue_entry.host
1527 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001528
jadmanski0afbb632008-06-06 21:10:57 +00001529 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001530 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1531 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001532 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001533 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1534 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001535
showard170873e2009-01-07 00:22:26 +00001536 self.set_host_log_file('verify', self.host)
1537 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001538
1539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001541 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001542 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001543 if self.queue_entry:
1544 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001545 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001546
1547
jadmanski0afbb632008-06-06 21:10:57 +00001548 def epilog(self):
1549 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001550
jadmanski0afbb632008-06-06 21:10:57 +00001551 if self.success:
1552 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001553
1554
showardd9205182009-04-27 20:09:55 +00001555class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001556 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001557 self.job = job
1558 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001559 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001560 super(QueueTask, self).__init__(cmd, self._execution_tag())
1561 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001562
1563
showard73ec0442009-02-07 02:05:20 +00001564 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001565 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001566
1567
1568 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1569 keyval_contents = '\n'.join(self._format_keyval(key, value)
1570 for key, value in keyval_dict.iteritems())
1571 # always end with a newline to allow additional keyvals to be written
1572 keyval_contents += '\n'
1573 _drone_manager.attach_file_to_execution(self._execution_tag(),
1574 keyval_contents,
1575 file_path=keyval_path)
1576
1577
1578 def _write_keyvals_before_job(self, keyval_dict):
1579 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1580
1581
showard170873e2009-01-07 00:22:26 +00001582 def _write_host_keyvals(self, host):
1583 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1584 host.hostname)
1585 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001586 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1587 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001588
1589
showard170873e2009-01-07 00:22:26 +00001590 def _execution_tag(self):
1591 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001592
1593
jadmanski0afbb632008-06-06 21:10:57 +00001594 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001595 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001596 keyval_dict = {queued_key: queued_time}
1597 if self.group_name:
1598 keyval_dict['host_group_name'] = self.group_name
1599 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001600 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001601 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001602 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001603 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001604 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001605 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001606 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001607 assert len(self.queue_entries) == 1
1608 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001609
1610
showard35162b02009-03-03 02:17:30 +00001611 def _write_lost_process_error_file(self):
1612 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1613 _drone_manager.write_lines_to_file(error_file_path,
1614 [_LOST_PROCESS_ERROR])
1615
1616
showardd3dc1992009-04-22 21:01:40 +00001617 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001618 if not self.monitor:
1619 return
1620
showardd9205182009-04-27 20:09:55 +00001621 self._write_job_finished()
1622
showardd3dc1992009-04-22 21:01:40 +00001623 # both of these conditionals can be true, iff the process ran, wrote a
1624 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001625 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001626 gather_task = GatherLogsTask(self.job, self.queue_entries)
1627 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001628
1629 if self.monitor.lost_process:
1630 self._write_lost_process_error_file()
1631 for queue_entry in self.queue_entries:
1632 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001633
1634
showardcbd74612008-11-19 21:42:02 +00001635 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001636 _drone_manager.write_lines_to_file(
1637 os.path.join(self._execution_tag(), 'status.log'),
1638 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001639 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001640
1641
jadmanskif7fa2cc2008-10-01 14:13:23 +00001642 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001643 if not self.monitor or not self.monitor.has_process():
1644 return
1645
jadmanskif7fa2cc2008-10-01 14:13:23 +00001646 # build up sets of all the aborted_by and aborted_on values
1647 aborted_by, aborted_on = set(), set()
1648 for queue_entry in self.queue_entries:
1649 if queue_entry.aborted_by:
1650 aborted_by.add(queue_entry.aborted_by)
1651 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1652 aborted_on.add(t)
1653
1654 # extract some actual, unique aborted by value and write it out
1655 assert len(aborted_by) <= 1
1656 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001657 aborted_by_value = aborted_by.pop()
1658 aborted_on_value = max(aborted_on)
1659 else:
1660 aborted_by_value = 'autotest_system'
1661 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001662
showarda0382352009-02-11 23:36:43 +00001663 self._write_keyval_after_job("aborted_by", aborted_by_value)
1664 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001665
showardcbd74612008-11-19 21:42:02 +00001666 aborted_on_string = str(datetime.datetime.fromtimestamp(
1667 aborted_on_value))
1668 self._write_status_comment('Job aborted by %s on %s' %
1669 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001670
1671
jadmanski0afbb632008-06-06 21:10:57 +00001672 def abort(self):
1673 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001674 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001675 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001676
1677
jadmanski0afbb632008-06-06 21:10:57 +00001678 def epilog(self):
1679 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001680 self._finish_task()
1681 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001682
1683
mblighbb421852008-03-11 22:36:16 +00001684class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001685 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001686 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001687 self.monitor = run_monitor
1688 self.started = True
1689 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001690
1691
jadmanski0afbb632008-06-06 21:10:57 +00001692 def run(self):
showard5add1c82009-05-26 19:27:46 +00001693 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001694
1695
jadmanski0afbb632008-06-06 21:10:57 +00001696 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001697 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001698
1699
showardd3dc1992009-04-22 21:01:40 +00001700class PostJobTask(AgentTask):
1701 def __init__(self, queue_entries, pidfile_name, logfile_name,
1702 run_monitor=None):
1703 """
1704 If run_monitor != None, we're recovering a running task.
1705 """
1706 self._queue_entries = queue_entries
1707 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001708
1709 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1710 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1711 self._autoserv_monitor = PidfileRunMonitor()
1712 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1713 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1714
1715 if _testing_mode:
1716 command = 'true'
1717 else:
1718 command = self._generate_command(self._results_dir)
1719
1720 super(PostJobTask, self).__init__(cmd=command,
1721 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001722 # this must happen *after* the super call
1723 self.monitor = run_monitor
1724 if run_monitor:
1725 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001726
1727 self.log_file = os.path.join(self._execution_tag, logfile_name)
1728 self._final_status = self._determine_final_status()
1729
1730
1731 def _generate_command(self, results_dir):
1732 raise NotImplementedError('Subclasses must override this')
1733
1734
1735 def _job_was_aborted(self):
1736 was_aborted = None
1737 for queue_entry in self._queue_entries:
1738 queue_entry.update_from_database()
1739 if was_aborted is None: # first queue entry
1740 was_aborted = bool(queue_entry.aborted)
1741 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1742 email_manager.manager.enqueue_notify_email(
1743 'Inconsistent abort state',
1744 'Queue entries have inconsistent abort state: ' +
1745 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1746 # don't crash here, just assume true
1747 return True
1748 return was_aborted
1749
1750
1751 def _determine_final_status(self):
1752 if self._job_was_aborted():
1753 return models.HostQueueEntry.Status.ABORTED
1754
1755 # we'll use a PidfileRunMonitor to read the autoserv exit status
1756 if self._autoserv_monitor.exit_code() == 0:
1757 return models.HostQueueEntry.Status.COMPLETED
1758 return models.HostQueueEntry.Status.FAILED
1759
1760
1761 def run(self):
showard5add1c82009-05-26 19:27:46 +00001762 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001763
showard5add1c82009-05-26 19:27:46 +00001764 # make sure we actually have results to work with.
1765 # this should never happen in normal operation.
1766 if not self._autoserv_monitor.has_process():
1767 email_manager.manager.enqueue_notify_email(
1768 'No results in post-job task',
1769 'No results in post-job task at %s' %
1770 self._autoserv_monitor.pidfile_id)
1771 self.finished(False)
1772 return
1773
1774 super(PostJobTask, self).run(
1775 pidfile_name=self._pidfile_name,
1776 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001777
1778
1779 def _set_all_statuses(self, status):
1780 for queue_entry in self._queue_entries:
1781 queue_entry.set_status(status)
1782
1783
1784 def abort(self):
1785 # override AgentTask.abort() to avoid killing the process and ending
1786 # the task. post-job tasks continue when the job is aborted.
1787 pass
1788
1789
1790class GatherLogsTask(PostJobTask):
1791 """
1792 Task responsible for
1793 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1794 * copying logs to the results repository
1795 * spawning CleanupTasks for hosts, if necessary
1796 * spawning a FinalReparseTask for the job
1797 """
1798 def __init__(self, job, queue_entries, run_monitor=None):
1799 self._job = job
1800 super(GatherLogsTask, self).__init__(
1801 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1802 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1803 self._set_ids(queue_entries=queue_entries)
1804
1805
1806 def _generate_command(self, results_dir):
1807 host_list = ','.join(queue_entry.host.hostname
1808 for queue_entry in self._queue_entries)
1809 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1810 '-r', results_dir]
1811
1812
1813 def prolog(self):
1814 super(GatherLogsTask, self).prolog()
1815 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1816
1817
1818 def _reboot_hosts(self):
1819 reboot_after = self._job.reboot_after
1820 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001821 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1822 do_reboot = True
1823 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001824 do_reboot = True
1825 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1826 final_success = (
1827 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1828 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1829 do_reboot = (final_success and num_tests_failed == 0)
1830
1831 for queue_entry in self._queue_entries:
1832 if do_reboot:
1833 # don't pass the queue entry to the CleanupTask. if the cleanup
1834 # fails, the job doesn't care -- it's over.
1835 cleanup_task = CleanupTask(host=queue_entry.host)
1836 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1837 else:
1838 queue_entry.host.set_status('Ready')
1839
1840
1841 def epilog(self):
1842 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001843 if self._autoserv_monitor.has_process():
1844 self._copy_and_parse_results(self._queue_entries,
1845 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001846 self._reboot_hosts()
1847
1848
showard0bbfc212009-04-29 21:06:13 +00001849 def run(self):
showard597bfd32009-05-08 18:22:50 +00001850 autoserv_exit_code = self._autoserv_monitor.exit_code()
1851 # only run if Autoserv exited due to some signal. if we have no exit
1852 # code, assume something bad (and signal-like) happened.
1853 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001854 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001855 else:
1856 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001857
1858
showard8fe93b52008-11-18 17:53:22 +00001859class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001860 def __init__(self, host=None, queue_entry=None):
1861 assert bool(host) ^ bool(queue_entry)
1862 if queue_entry:
1863 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001864 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001865 self.host = host
showard170873e2009-01-07 00:22:26 +00001866
1867 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001868 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1869 ['--cleanup'],
1870 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001871 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001872 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1873 failure_tasks=[repair_task])
1874
1875 self._set_ids(host=host, queue_entries=[queue_entry])
1876 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001877
mblighd5c95802008-03-05 00:33:46 +00001878
jadmanski0afbb632008-06-06 21:10:57 +00001879 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001880 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001881 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001882 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001883
mblighd5c95802008-03-05 00:33:46 +00001884
showard21baa452008-10-21 00:08:39 +00001885 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001886 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001887 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001888 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001889 self.host.update_field('dirty', 0)
1890
1891
showardd3dc1992009-04-22 21:01:40 +00001892class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001893 _num_running_parses = 0
1894
showardd3dc1992009-04-22 21:01:40 +00001895 def __init__(self, queue_entries, run_monitor=None):
1896 super(FinalReparseTask, self).__init__(queue_entries,
1897 pidfile_name=_PARSER_PID_FILE,
1898 logfile_name='.parse.log',
1899 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001900 # don't use _set_ids, since we don't want to set the host_ids
1901 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001902 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001903
showard97aed502008-11-04 02:01:24 +00001904
1905 @classmethod
1906 def _increment_running_parses(cls):
1907 cls._num_running_parses += 1
1908
1909
1910 @classmethod
1911 def _decrement_running_parses(cls):
1912 cls._num_running_parses -= 1
1913
1914
1915 @classmethod
1916 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001917 return (cls._num_running_parses <
1918 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001919
1920
1921 def prolog(self):
1922 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001923 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001924
1925
1926 def epilog(self):
1927 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001928 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001929
1930
showardd3dc1992009-04-22 21:01:40 +00001931 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00001932 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00001933 results_dir]
showard97aed502008-11-04 02:01:24 +00001934
1935
showard08a36412009-05-05 01:01:13 +00001936 def tick(self):
1937 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00001938 # and we can, at which point we revert to default behavior
1939 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00001940 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00001941 else:
1942 self._try_starting_parse()
1943
1944
1945 def run(self):
1946 # override run() to not actually run unless we can
1947 self._try_starting_parse()
1948
1949
1950 def _try_starting_parse(self):
1951 if not self._can_run_new_parse():
1952 return
showard170873e2009-01-07 00:22:26 +00001953
showard97aed502008-11-04 02:01:24 +00001954 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001955 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001956
showard97aed502008-11-04 02:01:24 +00001957 self._increment_running_parses()
1958 self._parse_started = True
1959
1960
1961 def finished(self, success):
1962 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001963 if self._parse_started:
1964 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001965
1966
showardc9ae1782009-01-30 01:42:37 +00001967class SetEntryPendingTask(AgentTask):
1968 def __init__(self, queue_entry):
1969 super(SetEntryPendingTask, self).__init__(cmd='')
1970 self._queue_entry = queue_entry
1971 self._set_ids(queue_entries=[queue_entry])
1972
1973
1974 def run(self):
1975 agent = self._queue_entry.on_pending()
1976 if agent:
1977 self.agent.dispatcher.add_agent(agent)
1978 self.finished(True)
1979
1980
showarda3c58572009-03-12 20:36:59 +00001981class DBError(Exception):
1982 """Raised by the DBObject constructor when its select fails."""
1983
1984
mbligh36768f02008-02-22 18:28:33 +00001985class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001986 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001987
1988 # Subclasses MUST override these:
1989 _table_name = ''
1990 _fields = ()
1991
showarda3c58572009-03-12 20:36:59 +00001992 # A mapping from (type, id) to the instance of the object for that
1993 # particular id. This prevents us from creating new Job() and Host()
1994 # instances for every HostQueueEntry object that we instantiate as
1995 # multiple HQEs often share the same Job.
1996 _instances_by_type_and_id = weakref.WeakValueDictionary()
1997 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001998
showarda3c58572009-03-12 20:36:59 +00001999
2000 def __new__(cls, id=None, **kwargs):
2001 """
2002 Look to see if we already have an instance for this particular type
2003 and id. If so, use it instead of creating a duplicate instance.
2004 """
2005 if id is not None:
2006 instance = cls._instances_by_type_and_id.get((cls, id))
2007 if instance:
2008 return instance
2009 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2010
2011
2012 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002013 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002014 assert self._table_name, '_table_name must be defined in your class'
2015 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002016 if not new_record:
2017 if self._initialized and not always_query:
2018 return # We've already been initialized.
2019 if id is None:
2020 id = row[0]
2021 # Tell future constructors to use us instead of re-querying while
2022 # this instance is still around.
2023 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002024
showard6ae5ea92009-02-25 00:11:51 +00002025 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002026
jadmanski0afbb632008-06-06 21:10:57 +00002027 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002028
jadmanski0afbb632008-06-06 21:10:57 +00002029 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002030 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002031
showarda3c58572009-03-12 20:36:59 +00002032 if self._initialized:
2033 differences = self._compare_fields_in_row(row)
2034 if differences:
showard7629f142009-03-27 21:02:02 +00002035 logging.warn(
2036 'initialized %s %s instance requery is updating: %s',
2037 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002038 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002039 self._initialized = True
2040
2041
2042 @classmethod
2043 def _clear_instance_cache(cls):
2044 """Used for testing, clear the internal instance cache."""
2045 cls._instances_by_type_and_id.clear()
2046
2047
showardccbd6c52009-03-21 00:10:21 +00002048 def _fetch_row_from_db(self, row_id):
2049 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2050 rows = _db.execute(sql, (row_id,))
2051 if not rows:
showard76e29d12009-04-15 21:53:10 +00002052 raise DBError("row not found (table=%s, row id=%s)"
2053 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002054 return rows[0]
2055
2056
showarda3c58572009-03-12 20:36:59 +00002057 def _assert_row_length(self, row):
2058 assert len(row) == len(self._fields), (
2059 "table = %s, row = %s/%d, fields = %s/%d" % (
2060 self.__table, row, len(row), self._fields, len(self._fields)))
2061
2062
2063 def _compare_fields_in_row(self, row):
2064 """
2065 Given a row as returned by a SELECT query, compare it to our existing
2066 in memory fields.
2067
2068 @param row - A sequence of values corresponding to fields named in
2069 The class attribute _fields.
2070
2071 @returns A dictionary listing the differences keyed by field name
2072 containing tuples of (current_value, row_value).
2073 """
2074 self._assert_row_length(row)
2075 differences = {}
2076 for field, row_value in itertools.izip(self._fields, row):
2077 current_value = getattr(self, field)
2078 if current_value != row_value:
2079 differences[field] = (current_value, row_value)
2080 return differences
showard2bab8f42008-11-12 18:15:22 +00002081
2082
2083 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002084 """
2085 Update our field attributes using a single row returned by SELECT.
2086
2087 @param row - A sequence of values corresponding to fields named in
2088 the class fields list.
2089 """
2090 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002091
showard2bab8f42008-11-12 18:15:22 +00002092 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002093 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002094 setattr(self, field, value)
2095 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002096
showard2bab8f42008-11-12 18:15:22 +00002097 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002098
mblighe2586682008-02-29 22:45:46 +00002099
showardccbd6c52009-03-21 00:10:21 +00002100 def update_from_database(self):
2101 assert self.id is not None
2102 row = self._fetch_row_from_db(self.id)
2103 self._update_fields_from_row(row)
2104
2105
jadmanski0afbb632008-06-06 21:10:57 +00002106 def count(self, where, table = None):
2107 if not table:
2108 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002109
jadmanski0afbb632008-06-06 21:10:57 +00002110 rows = _db.execute("""
2111 SELECT count(*) FROM %s
2112 WHERE %s
2113 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002114
jadmanski0afbb632008-06-06 21:10:57 +00002115 assert len(rows) == 1
2116
2117 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002118
2119
showardd3dc1992009-04-22 21:01:40 +00002120 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002121 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002122
showard2bab8f42008-11-12 18:15:22 +00002123 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002124 return
mbligh36768f02008-02-22 18:28:33 +00002125
mblighf8c624d2008-07-03 16:58:45 +00002126 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002127 _db.execute(query, (value, self.id))
2128
showard2bab8f42008-11-12 18:15:22 +00002129 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002130
2131
jadmanski0afbb632008-06-06 21:10:57 +00002132 def save(self):
2133 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002134 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002135 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002136 values = []
2137 for key in keys:
2138 value = getattr(self, key)
2139 if value is None:
2140 values.append('NULL')
2141 else:
2142 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002143 values_str = ','.join(values)
2144 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2145 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002146 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002147 # Update our id to the one the database just assigned to us.
2148 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002149
2150
jadmanski0afbb632008-06-06 21:10:57 +00002151 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002152 self._instances_by_type_and_id.pop((type(self), id), None)
2153 self._initialized = False
2154 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002155 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2156 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002157
2158
showard63a34772008-08-18 19:32:50 +00002159 @staticmethod
2160 def _prefix_with(string, prefix):
2161 if string:
2162 string = prefix + string
2163 return string
2164
2165
jadmanski0afbb632008-06-06 21:10:57 +00002166 @classmethod
showard989f25d2008-10-01 11:38:11 +00002167 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002168 """
2169 Construct instances of our class based on the given database query.
2170
2171 @yields One class instance for each row fetched.
2172 """
showard63a34772008-08-18 19:32:50 +00002173 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2174 where = cls._prefix_with(where, 'WHERE ')
2175 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002176 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002177 'joins' : joins,
2178 'where' : where,
2179 'order_by' : order_by})
2180 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002181 for row in rows:
2182 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002183
mbligh36768f02008-02-22 18:28:33 +00002184
2185class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002186 _table_name = 'ineligible_host_queues'
2187 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002188
2189
showard89f84db2009-03-12 20:39:13 +00002190class AtomicGroup(DBObject):
2191 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002192 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2193 'invalid')
showard89f84db2009-03-12 20:39:13 +00002194
2195
showard989f25d2008-10-01 11:38:11 +00002196class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002197 _table_name = 'labels'
2198 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002199 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002200
2201
mbligh36768f02008-02-22 18:28:33 +00002202class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002203 _table_name = 'hosts'
2204 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2205 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2206
2207
jadmanski0afbb632008-06-06 21:10:57 +00002208 def current_task(self):
2209 rows = _db.execute("""
2210 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2211 """, (self.id,))
2212
2213 if len(rows) == 0:
2214 return None
2215 else:
2216 assert len(rows) == 1
2217 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002218 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002219
2220
jadmanski0afbb632008-06-06 21:10:57 +00002221 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002222 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002223 if self.current_task():
2224 self.current_task().requeue()
2225
showard6ae5ea92009-02-25 00:11:51 +00002226
jadmanski0afbb632008-06-06 21:10:57 +00002227 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002228 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002229 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002230
2231
showard170873e2009-01-07 00:22:26 +00002232 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002233 """
showard170873e2009-01-07 00:22:26 +00002234 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002235 """
2236 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002237 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002238 FROM labels
2239 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002240 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002241 ORDER BY labels.name
2242 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002243 platform = None
2244 all_labels = []
2245 for label_name, is_platform in rows:
2246 if is_platform:
2247 platform = label_name
2248 all_labels.append(label_name)
2249 return platform, all_labels
2250
2251
showarda64e52a2009-06-08 23:24:08 +00002252 def reverify_tasks(self, cleanup=True):
2253 tasks = [VerifyTask(host=self)]
2254 if cleanup:
2255 tasks.insert(0, CleanupTask(host=self))
showard170873e2009-01-07 00:22:26 +00002256 # just to make sure this host does not get taken away
2257 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002258 return tasks
showardd8e548a2008-09-09 03:04:57 +00002259
2260
showard54c1ea92009-05-20 00:32:58 +00002261 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2262
2263
2264 @classmethod
2265 def cmp_for_sort(cls, a, b):
2266 """
2267 A comparison function for sorting Host objects by hostname.
2268
2269 This strips any trailing numeric digits, ignores leading 0s and
2270 compares hostnames by the leading name and the trailing digits as a
2271 number. If both hostnames do not match this pattern, they are simply
2272 compared as lower case strings.
2273
2274 Example of how hostnames will be sorted:
2275
2276 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2277
2278 This hopefully satisfy most people's hostname sorting needs regardless
2279 of their exact naming schemes. Nobody sane should have both a host10
2280 and host010 (but the algorithm works regardless).
2281 """
2282 lower_a = a.hostname.lower()
2283 lower_b = b.hostname.lower()
2284 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2285 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2286 if match_a and match_b:
2287 name_a, number_a_str = match_a.groups()
2288 name_b, number_b_str = match_b.groups()
2289 number_a = int(number_a_str.lstrip('0'))
2290 number_b = int(number_b_str.lstrip('0'))
2291 result = cmp((name_a, number_a), (name_b, number_b))
2292 if result == 0 and lower_a != lower_b:
2293 # If they compared equal above but the lower case names are
2294 # indeed different, don't report equality. abc012 != abc12.
2295 return cmp(lower_a, lower_b)
2296 return result
2297 else:
2298 return cmp(lower_a, lower_b)
2299
2300
mbligh36768f02008-02-22 18:28:33 +00002301class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002302 _table_name = 'host_queue_entries'
2303 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002304 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002305 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002306
2307
showarda3c58572009-03-12 20:36:59 +00002308 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002309 assert id or row
showarda3c58572009-03-12 20:36:59 +00002310 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002311 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002312
jadmanski0afbb632008-06-06 21:10:57 +00002313 if self.host_id:
2314 self.host = Host(self.host_id)
2315 else:
2316 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002317
showard170873e2009-01-07 00:22:26 +00002318 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002319 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002320
2321
showard89f84db2009-03-12 20:39:13 +00002322 @classmethod
2323 def clone(cls, template):
2324 """
2325 Creates a new row using the values from a template instance.
2326
2327 The new instance will not exist in the database or have a valid
2328 id attribute until its save() method is called.
2329 """
2330 assert isinstance(template, cls)
2331 new_row = [getattr(template, field) for field in cls._fields]
2332 clone = cls(row=new_row, new_record=True)
2333 clone.id = None
2334 return clone
2335
2336
showardc85c21b2008-11-24 22:17:37 +00002337 def _view_job_url(self):
2338 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2339
2340
showardf1ae3542009-05-11 19:26:02 +00002341 def get_labels(self):
2342 """
2343 Get all labels associated with this host queue entry (either via the
2344 meta_host or as a job dependency label). The labels yielded are not
2345 guaranteed to be unique.
2346
2347 @yields Label instances associated with this host_queue_entry.
2348 """
2349 if self.meta_host:
2350 yield Label(id=self.meta_host, always_query=False)
2351 labels = Label.fetch(
2352 joins="JOIN jobs_dependency_labels AS deps "
2353 "ON (labels.id = deps.label_id)",
2354 where="deps.job_id = %d" % self.job.id)
2355 for label in labels:
2356 yield label
2357
2358
jadmanski0afbb632008-06-06 21:10:57 +00002359 def set_host(self, host):
2360 if host:
2361 self.queue_log_record('Assigning host ' + host.hostname)
2362 self.update_field('host_id', host.id)
2363 self.update_field('active', True)
2364 self.block_host(host.id)
2365 else:
2366 self.queue_log_record('Releasing host')
2367 self.unblock_host(self.host.id)
2368 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002369
jadmanski0afbb632008-06-06 21:10:57 +00002370 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002371
2372
jadmanski0afbb632008-06-06 21:10:57 +00002373 def get_host(self):
2374 return self.host
mbligh36768f02008-02-22 18:28:33 +00002375
2376
jadmanski0afbb632008-06-06 21:10:57 +00002377 def queue_log_record(self, log_line):
2378 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002379 _drone_manager.write_lines_to_file(self.queue_log_path,
2380 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002381
2382
jadmanski0afbb632008-06-06 21:10:57 +00002383 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002384 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002385 row = [0, self.job.id, host_id]
2386 block = IneligibleHostQueue(row=row, new_record=True)
2387 block.save()
mblighe2586682008-02-29 22:45:46 +00002388
2389
jadmanski0afbb632008-06-06 21:10:57 +00002390 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002391 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002392 blocks = IneligibleHostQueue.fetch(
2393 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2394 for block in blocks:
2395 block.delete()
mblighe2586682008-02-29 22:45:46 +00002396
2397
showard2bab8f42008-11-12 18:15:22 +00002398 def set_execution_subdir(self, subdir=None):
2399 if subdir is None:
2400 assert self.get_host()
2401 subdir = self.get_host().hostname
2402 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002403
2404
showard6355f6b2008-12-05 18:52:13 +00002405 def _get_hostname(self):
2406 if self.host:
2407 return self.host.hostname
2408 return 'no host'
2409
2410
showard170873e2009-01-07 00:22:26 +00002411 def __str__(self):
2412 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2413
2414
jadmanski0afbb632008-06-06 21:10:57 +00002415 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002416 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002417
showardb18134f2009-03-20 20:52:18 +00002418 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002419
showardc85c21b2008-11-24 22:17:37 +00002420 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002421 self.update_field('complete', False)
2422 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002423
jadmanski0afbb632008-06-06 21:10:57 +00002424 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002425 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002426 self.update_field('complete', False)
2427 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002428
showardc85c21b2008-11-24 22:17:37 +00002429 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002430 self.update_field('complete', True)
2431 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002432
2433 should_email_status = (status.lower() in _notify_email_statuses or
2434 'all' in _notify_email_statuses)
2435 if should_email_status:
2436 self._email_on_status(status)
2437
2438 self._email_on_job_complete()
2439
2440
2441 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002442 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002443
2444 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2445 self.job.id, self.job.name, hostname, status)
2446 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2447 self.job.id, self.job.name, hostname, status,
2448 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002449 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002450
2451
2452 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002453 if not self.job.is_finished():
2454 return
showard542e8402008-09-19 20:16:18 +00002455
showardc85c21b2008-11-24 22:17:37 +00002456 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002457 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002458 for queue_entry in hosts_queue:
2459 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002460 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002461 queue_entry.status))
2462
2463 summary_text = "\n".join(summary_text)
2464 status_counts = models.Job.objects.get_status_counts(
2465 [self.job.id])[self.job.id]
2466 status = ', '.join('%d %s' % (count, status) for status, count
2467 in status_counts.iteritems())
2468
2469 subject = 'Autotest: Job ID: %s "%s" %s' % (
2470 self.job.id, self.job.name, status)
2471 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2472 self.job.id, self.job.name, status, self._view_job_url(),
2473 summary_text)
showard170873e2009-01-07 00:22:26 +00002474 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002475
2476
showard89f84db2009-03-12 20:39:13 +00002477 def run(self, assigned_host=None):
2478 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002479 assert assigned_host
2480 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002481 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002482
showardb18134f2009-03-20 20:52:18 +00002483 logging.info("%s/%s/%s scheduled on %s, status=%s",
2484 self.job.name, self.meta_host, self.atomic_group_id,
2485 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002486
jadmanski0afbb632008-06-06 21:10:57 +00002487 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002488
showard6ae5ea92009-02-25 00:11:51 +00002489
jadmanski0afbb632008-06-06 21:10:57 +00002490 def requeue(self):
2491 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002492 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002493 # verify/cleanup failure sets the execution subdir, so reset it here
2494 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002495 if self.meta_host:
2496 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002497
2498
jadmanski0afbb632008-06-06 21:10:57 +00002499 def handle_host_failure(self):
2500 """\
2501 Called when this queue entry's host has failed verification and
2502 repair.
2503 """
2504 assert not self.meta_host
2505 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002506 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002507
2508
jadmanskif7fa2cc2008-10-01 14:13:23 +00002509 @property
2510 def aborted_by(self):
2511 self._load_abort_info()
2512 return self._aborted_by
2513
2514
2515 @property
2516 def aborted_on(self):
2517 self._load_abort_info()
2518 return self._aborted_on
2519
2520
2521 def _load_abort_info(self):
2522 """ Fetch info about who aborted the job. """
2523 if hasattr(self, "_aborted_by"):
2524 return
2525 rows = _db.execute("""
2526 SELECT users.login, aborted_host_queue_entries.aborted_on
2527 FROM aborted_host_queue_entries
2528 INNER JOIN users
2529 ON users.id = aborted_host_queue_entries.aborted_by_id
2530 WHERE aborted_host_queue_entries.queue_entry_id = %s
2531 """, (self.id,))
2532 if rows:
2533 self._aborted_by, self._aborted_on = rows[0]
2534 else:
2535 self._aborted_by = self._aborted_on = None
2536
2537
showardb2e2c322008-10-14 17:33:55 +00002538 def on_pending(self):
2539 """
2540 Called when an entry in a synchronous job has passed verify. If the
2541 job is ready to run, returns an agent to run the job. Returns None
2542 otherwise.
2543 """
2544 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002545 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002546 if self.job.is_ready():
2547 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002548 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002549 return None
2550
2551
showardd3dc1992009-04-22 21:01:40 +00002552 def abort(self, dispatcher):
2553 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002554
showardd3dc1992009-04-22 21:01:40 +00002555 Status = models.HostQueueEntry.Status
2556 has_running_job_agent = (
2557 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2558 and dispatcher.get_agents_for_entry(self))
2559 if has_running_job_agent:
2560 # do nothing; post-job tasks will finish and then mark this entry
2561 # with status "Aborted" and take care of the host
2562 return
2563
2564 if self.status in (Status.STARTING, Status.PENDING):
2565 self.host.set_status(models.Host.Status.READY)
2566 elif self.status == Status.VERIFYING:
2567 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2568
2569 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002570
2571 def execution_tag(self):
2572 assert self.execution_subdir
2573 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002574
2575
mbligh36768f02008-02-22 18:28:33 +00002576class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002577 _table_name = 'jobs'
2578 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2579 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002580 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002581 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002582
2583
showarda3c58572009-03-12 20:36:59 +00002584 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002585 assert id or row
showarda3c58572009-03-12 20:36:59 +00002586 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002587
mblighe2586682008-02-29 22:45:46 +00002588
jadmanski0afbb632008-06-06 21:10:57 +00002589 def is_server_job(self):
2590 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002591
2592
showard170873e2009-01-07 00:22:26 +00002593 def tag(self):
2594 return "%s-%s" % (self.id, self.owner)
2595
2596
jadmanski0afbb632008-06-06 21:10:57 +00002597 def get_host_queue_entries(self):
2598 rows = _db.execute("""
2599 SELECT * FROM host_queue_entries
2600 WHERE job_id= %s
2601 """, (self.id,))
2602 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002603
jadmanski0afbb632008-06-06 21:10:57 +00002604 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002605
jadmanski0afbb632008-06-06 21:10:57 +00002606 return entries
mbligh36768f02008-02-22 18:28:33 +00002607
2608
jadmanski0afbb632008-06-06 21:10:57 +00002609 def set_status(self, status, update_queues=False):
2610 self.update_field('status',status)
2611
2612 if update_queues:
2613 for queue_entry in self.get_host_queue_entries():
2614 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002615
2616
jadmanski0afbb632008-06-06 21:10:57 +00002617 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002618 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2619 status='Pending')
2620 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002621
2622
jadmanski0afbb632008-06-06 21:10:57 +00002623 def num_machines(self, clause = None):
2624 sql = "job_id=%s" % self.id
2625 if clause:
2626 sql += " AND (%s)" % clause
2627 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002628
2629
jadmanski0afbb632008-06-06 21:10:57 +00002630 def num_queued(self):
2631 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002632
2633
jadmanski0afbb632008-06-06 21:10:57 +00002634 def num_active(self):
2635 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002636
2637
jadmanski0afbb632008-06-06 21:10:57 +00002638 def num_complete(self):
2639 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002640
2641
jadmanski0afbb632008-06-06 21:10:57 +00002642 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002643 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002644
mbligh36768f02008-02-22 18:28:33 +00002645
showard6bb7c292009-01-30 01:44:51 +00002646 def _not_yet_run_entries(self, include_verifying=True):
2647 statuses = [models.HostQueueEntry.Status.QUEUED,
2648 models.HostQueueEntry.Status.PENDING]
2649 if include_verifying:
2650 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2651 return models.HostQueueEntry.objects.filter(job=self.id,
2652 status__in=statuses)
2653
2654
2655 def _stop_all_entries(self):
2656 entries_to_stop = self._not_yet_run_entries(
2657 include_verifying=False)
2658 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002659 assert not child_entry.complete, (
2660 '%s status=%s, active=%s, complete=%s' %
2661 (child_entry.id, child_entry.status, child_entry.active,
2662 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002663 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2664 child_entry.host.status = models.Host.Status.READY
2665 child_entry.host.save()
2666 child_entry.status = models.HostQueueEntry.Status.STOPPED
2667 child_entry.save()
2668
showard2bab8f42008-11-12 18:15:22 +00002669 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002670 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002671 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002672 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002673
2674
jadmanski0afbb632008-06-06 21:10:57 +00002675 def write_to_machines_file(self, queue_entry):
2676 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002677 file_path = os.path.join(self.tag(), '.machines')
2678 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002679
2680
showardf1ae3542009-05-11 19:26:02 +00002681 def _next_group_name(self, group_name=''):
2682 """@returns a directory name to use for the next host group results."""
2683 if group_name:
2684 # Sanitize for use as a pathname.
2685 group_name = group_name.replace(os.path.sep, '_')
2686 if group_name.startswith('.'):
2687 group_name = '_' + group_name[1:]
2688 # Add a separator between the group name and 'group%d'.
2689 group_name += '.'
2690 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002691 query = models.HostQueueEntry.objects.filter(
2692 job=self.id).values('execution_subdir').distinct()
2693 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002694 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2695 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002696 if ids:
2697 next_id = max(ids) + 1
2698 else:
2699 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002700 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002701
2702
showard170873e2009-01-07 00:22:26 +00002703 def _write_control_file(self, execution_tag):
2704 control_path = _drone_manager.attach_file_to_execution(
2705 execution_tag, self.control_file)
2706 return control_path
mbligh36768f02008-02-22 18:28:33 +00002707
showardb2e2c322008-10-14 17:33:55 +00002708
showard2bab8f42008-11-12 18:15:22 +00002709 def get_group_entries(self, queue_entry_from_group):
2710 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002711 return list(HostQueueEntry.fetch(
2712 where='job_id=%s AND execution_subdir=%s',
2713 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002714
2715
showardb2e2c322008-10-14 17:33:55 +00002716 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002717 assert queue_entries
2718 execution_tag = queue_entries[0].execution_tag()
2719 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002720 hostnames = ','.join([entry.get_host().hostname
2721 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002722
showard87ba02a2009-04-20 19:37:32 +00002723 params = _autoserv_command_line(
2724 hostnames, execution_tag,
2725 ['-P', execution_tag, '-n',
2726 _drone_manager.absolute_path(control_path)],
2727 job=self)
mbligh36768f02008-02-22 18:28:33 +00002728
jadmanski0afbb632008-06-06 21:10:57 +00002729 if not self.is_server_job():
2730 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002731
showardb2e2c322008-10-14 17:33:55 +00002732 return params
mblighe2586682008-02-29 22:45:46 +00002733
mbligh36768f02008-02-22 18:28:33 +00002734
showardc9ae1782009-01-30 01:42:37 +00002735 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002736 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002737 return True
showard0fc38302008-10-23 00:44:07 +00002738 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002739 return queue_entry.get_host().dirty
2740 return False
showard21baa452008-10-21 00:08:39 +00002741
showardc9ae1782009-01-30 01:42:37 +00002742
2743 def _should_run_verify(self, queue_entry):
2744 do_not_verify = (queue_entry.host.protection ==
2745 host_protections.Protection.DO_NOT_VERIFY)
2746 if do_not_verify:
2747 return False
2748 return self.run_verify
2749
2750
2751 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002752 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002753 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002754 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002755 if self._should_run_verify(queue_entry):
2756 tasks.append(VerifyTask(queue_entry=queue_entry))
2757 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002758 return tasks
2759
2760
showardf1ae3542009-05-11 19:26:02 +00002761 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002762 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002763 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002764 else:
showardf1ae3542009-05-11 19:26:02 +00002765 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002766 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002767 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002768 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002769
2770 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002771 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002772
2773
2774 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002775 """
2776 @returns A tuple containing a list of HostQueueEntry instances to be
2777 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002778 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002779 """
2780 if include_queue_entry.atomic_group_id:
2781 atomic_group = AtomicGroup(include_queue_entry.atomic_group_id,
2782 always_query=False)
2783 else:
2784 atomic_group = None
2785
showard2bab8f42008-11-12 18:15:22 +00002786 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002787 if atomic_group:
2788 num_entries_wanted = atomic_group.max_number_of_machines
2789 else:
2790 num_entries_wanted = self.synch_count
2791 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002792
showardf1ae3542009-05-11 19:26:02 +00002793 if num_entries_wanted > 0:
2794 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002795 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002796 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002797 params=(self.id, include_queue_entry.id)))
2798
2799 # Sort the chosen hosts by hostname before slicing.
2800 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2801 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2802 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2803 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002804
showardf1ae3542009-05-11 19:26:02 +00002805 # Sanity check. We'll only ever be called if this can be met.
2806 assert len(chosen_entries) >= self.synch_count
2807
2808 if atomic_group:
2809 # Look at any meta_host and dependency labels and pick the first
2810 # one that also specifies this atomic group. Use that label name
2811 # as the group name if possible (it is more specific).
2812 group_name = atomic_group.name
2813 for label in include_queue_entry.get_labels():
2814 if label.atomic_group_id:
2815 assert label.atomic_group_id == atomic_group.id
2816 group_name = label.name
2817 break
2818 else:
2819 group_name = ''
2820
2821 self._assign_new_group(chosen_entries, group_name=group_name)
2822 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002823
2824
2825 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002826 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002827 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2828 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002829
showardf1ae3542009-05-11 19:26:02 +00002830 queue_entries, group_name = self._choose_group_to_run(queue_entry)
2831 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00002832
2833
showardf1ae3542009-05-11 19:26:02 +00002834 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00002835 for queue_entry in queue_entries:
2836 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002837 params = self._get_autoserv_params(queue_entries)
2838 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00002839 cmd=params, group_name=group_name)
2840 tasks = [queue_task]
showardb2e2c322008-10-14 17:33:55 +00002841 entry_ids = [entry.id for entry in queue_entries]
2842
showard170873e2009-01-07 00:22:26 +00002843 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002844
2845
mbligh36768f02008-02-22 18:28:33 +00002846if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002847 main()