blob: e7c6d1f4dc45072bdc9ae689bde1f3a89b32510d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000017from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
showardb18134f2009-03-20 20:52:18 +000062# load the logging settings
63scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000064if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
65 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000066# Here we export the log name, using the same convention as autoserv's results
67# directory.
mblighc9895aa2009-04-01 18:36:58 +000068if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
69 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
70else:
71 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
72 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
73
showardb18134f2009-03-20 20:52:18 +000074logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
75
mbligh36768f02008-02-22 18:28:33 +000076
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
mbligh36768f02008-02-22 18:28:33 +000081def main():
showard27f33872009-04-07 18:20:53 +000082 try:
83 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000084 except SystemExit:
85 raise
showard27f33872009-04-07 18:20:53 +000086 except:
87 logging.exception('Exception escaping in monitor_db')
88 raise
89
90
91def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000092 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000093
jadmanski0afbb632008-06-06 21:10:57 +000094 parser = optparse.OptionParser(usage)
95 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
96 action='store_true')
97 parser.add_option('--logfile', help='Set a log file that all stdout ' +
98 'should be redirected to. Stderr will go to this ' +
99 'file + ".err"')
100 parser.add_option('--test', help='Indicate that scheduler is under ' +
101 'test and should use dummy autoserv and no parsing',
102 action='store_true')
103 (options, args) = parser.parse_args()
104 if len(args) != 1:
105 parser.print_usage()
106 return
mbligh36768f02008-02-22 18:28:33 +0000107
showard5613c662009-06-08 23:30:33 +0000108 scheduler_enabled = global_config.global_config.get_config_value(
109 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
110
111 if not scheduler_enabled:
112 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
113 "global_config's SCHEDULER section to enabled it. Exiting.")
114 print msg
115 sys.exit(1)
116
jadmanski0afbb632008-06-06 21:10:57 +0000117 global RESULTS_DIR
118 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000119
mbligh83c1e9e2009-05-01 23:10:41 +0000120 site_init = utils.import_site_function(__file__,
121 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
122 _site_init_monitor_db_dummy)
123 site_init()
124
showardcca334f2009-03-12 20:38:34 +0000125 # Change the cwd while running to avoid issues incase we were launched from
126 # somewhere odd (such as a random NFS home directory of the person running
127 # sudo to launch us as the appropriate user).
128 os.chdir(RESULTS_DIR)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000131 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
132 "notify_email_statuses",
133 default='')
showardc85c21b2008-11-24 22:17:37 +0000134 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000135 _notify_email_statuses = [status for status in
136 re.split(r'[\s,;:]', notify_statuses_list.lower())
137 if status]
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
mbligh37eceaa2008-12-15 22:56:37 +0000145 # AUTOTEST_WEB.base_url is still a supported config option as some people
146 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000147 global _base_url
showard170873e2009-01-07 00:22:26 +0000148 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
149 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000150 if config_base_url:
151 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000152 else:
mbligh37eceaa2008-12-15 22:56:37 +0000153 # For the common case of everything running on a single server you
154 # can just set the hostname in a single place in the config file.
155 server_name = c.get_config_value('SERVER', 'hostname')
156 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000157 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000158 sys.exit(1)
159 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000160
showardc5afc462009-01-13 00:09:39 +0000161 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000162 server.start()
163
jadmanski0afbb632008-06-06 21:10:57 +0000164 try:
showardc5afc462009-01-13 00:09:39 +0000165 init(options.logfile)
166 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000167 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 while not _shutdown:
170 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000171 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000172 except:
showard170873e2009-01-07 00:22:26 +0000173 email_manager.manager.log_stacktrace(
174 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000175
showard170873e2009-01-07 00:22:26 +0000176 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000177 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000178 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000179 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000180
181
182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
188def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000189 if logfile:
190 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
mblighfb676032009-04-01 18:25:38 +0000194 utils.write_pid("monitor_db")
195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000203 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
223def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000224 out_file = logfile
225 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000226 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000227 out_fd = open(out_file, "a", buffering=0)
228 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000229
jadmanski0afbb632008-06-06 21:10:57 +0000230 os.dup2(out_fd.fileno(), sys.stdout.fileno())
231 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000232
jadmanski0afbb632008-06-06 21:10:57 +0000233 sys.stdout = out_fd
234 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000235
236
showard87ba02a2009-04-20 19:37:32 +0000237def _autoserv_command_line(machines, results_dir, extra_args, job=None,
238 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000239 """
240 @returns The autoserv command line as a list of executable + parameters.
241
242 @param machines - string - A machine or comma separated list of machines
243 for the (-m) flag.
244 @param results_dir - string - Where the results will be written (-r).
245 @param extra_args - list - Additional arguments to pass to autoserv.
246 @param job - Job object - If supplied, -u owner and -l name parameters
247 will be added.
248 @param queue_entry - A HostQueueEntry object - If supplied and no Job
249 object was supplied, this will be used to lookup the Job object.
250 """
showard87ba02a2009-04-20 19:37:32 +0000251 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
252 '-r', _drone_manager.absolute_path(results_dir)]
253 if job or queue_entry:
254 if not job:
255 job = queue_entry.job
256 autoserv_argv += ['-u', job.owner, '-l', job.name]
257 return autoserv_argv + extra_args
258
259
showard89f84db2009-03-12 20:39:13 +0000260class SchedulerError(Exception):
261 """Raised by HostScheduler when an inconsistent state occurs."""
262
263
showard63a34772008-08-18 19:32:50 +0000264class HostScheduler(object):
265 def _get_ready_hosts(self):
266 # avoid any host with a currently active queue entry against it
267 hosts = Host.fetch(
268 joins='LEFT JOIN host_queue_entries AS active_hqe '
269 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000270 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000271 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000272 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000273 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
274 return dict((host.id, host) for host in hosts)
275
276
277 @staticmethod
278 def _get_sql_id_list(id_list):
279 return ','.join(str(item_id) for item_id in id_list)
280
281
282 @classmethod
showard989f25d2008-10-01 11:38:11 +0000283 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000284 if not id_list:
285 return {}
showard63a34772008-08-18 19:32:50 +0000286 query %= cls._get_sql_id_list(id_list)
287 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000288 return cls._process_many2many_dict(rows, flip)
289
290
291 @staticmethod
292 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000293 result = {}
294 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000295 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000296 if flip:
297 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000298 result.setdefault(left_id, set()).add(right_id)
299 return result
300
301
302 @classmethod
303 def _get_job_acl_groups(cls, job_ids):
304 query = """
showardd9ac4452009-02-07 02:04:37 +0000305 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000306 FROM jobs
307 INNER JOIN users ON users.login = jobs.owner
308 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
309 WHERE jobs.id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
315 def _get_job_ineligible_hosts(cls, job_ids):
316 query = """
317 SELECT job_id, host_id
318 FROM ineligible_host_queues
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard989f25d2008-10-01 11:38:11 +0000325 def _get_job_dependencies(cls, job_ids):
326 query = """
327 SELECT job_id, label_id
328 FROM jobs_dependency_labels
329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard63a34772008-08-18 19:32:50 +0000335 def _get_host_acls(cls, host_ids):
336 query = """
showardd9ac4452009-02-07 02:04:37 +0000337 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000338 FROM acl_groups_hosts
339 WHERE host_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, host_ids)
342
343
344 @classmethod
345 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000346 if not host_ids:
347 return {}, {}
showard63a34772008-08-18 19:32:50 +0000348 query = """
349 SELECT label_id, host_id
350 FROM hosts_labels
351 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000352 """ % cls._get_sql_id_list(host_ids)
353 rows = _db.execute(query)
354 labels_to_hosts = cls._process_many2many_dict(rows)
355 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
356 return labels_to_hosts, hosts_to_labels
357
358
359 @classmethod
360 def _get_labels(cls):
361 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000362
363
364 def refresh(self, pending_queue_entries):
365 self._hosts_available = self._get_ready_hosts()
366
367 relevant_jobs = [queue_entry.job_id
368 for queue_entry in pending_queue_entries]
369 self._job_acls = self._get_job_acl_groups(relevant_jobs)
370 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000371 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000372
373 host_ids = self._hosts_available.keys()
374 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000375 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
376
377 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def _is_acl_accessible(self, host_id, queue_entry):
381 job_acls = self._job_acls.get(queue_entry.job_id, set())
382 host_acls = self._host_acls.get(host_id, set())
383 return len(host_acls.intersection(job_acls)) > 0
384
385
showard989f25d2008-10-01 11:38:11 +0000386 def _check_job_dependencies(self, job_dependencies, host_labels):
387 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000388 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000389
390
391 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
392 queue_entry):
showardade14e22009-01-26 22:38:32 +0000393 if not queue_entry.meta_host:
394 # bypass only_if_needed labels when a specific host is selected
395 return True
396
showard989f25d2008-10-01 11:38:11 +0000397 for label_id in host_labels:
398 label = self._labels[label_id]
399 if not label.only_if_needed:
400 # we don't care about non-only_if_needed labels
401 continue
402 if queue_entry.meta_host == label_id:
403 # if the label was requested in a metahost it's OK
404 continue
405 if label_id not in job_dependencies:
406 return False
407 return True
408
409
showard89f84db2009-03-12 20:39:13 +0000410 def _check_atomic_group_labels(self, host_labels, queue_entry):
411 """
412 Determine if the given HostQueueEntry's atomic group settings are okay
413 to schedule on a host with the given labels.
414
415 @param host_labels - A list of label ids that the host has.
416 @param queue_entry - The HostQueueEntry being considered for the host.
417
418 @returns True if atomic group settings are okay, False otherwise.
419 """
420 return (self._get_host_atomic_group_id(host_labels) ==
421 queue_entry.atomic_group_id)
422
423
424 def _get_host_atomic_group_id(self, host_labels):
425 """
426 Return the atomic group label id for a host with the given set of
427 labels if any, or None otherwise. Raises an exception if more than
428 one atomic group are found in the set of labels.
429
430 @param host_labels - A list of label ids that the host has.
431
432 @returns The id of the atomic group found on a label in host_labels
433 or None if no atomic group label is found.
434 @raises SchedulerError - If more than one atomic group label is found.
435 """
436 atomic_ids = [self._labels[label_id].atomic_group_id
437 for label_id in host_labels
438 if self._labels[label_id].atomic_group_id is not None]
439 if not atomic_ids:
440 return None
441 if len(atomic_ids) > 1:
442 raise SchedulerError('More than one atomic label on host.')
443 return atomic_ids[0]
444
445
446 def _get_atomic_group_labels(self, atomic_group_id):
447 """
448 Lookup the label ids that an atomic_group is associated with.
449
450 @param atomic_group_id - The id of the AtomicGroup to look up.
451
452 @returns A generator yeilding Label ids for this atomic group.
453 """
454 return (id for id, label in self._labels.iteritems()
455 if label.atomic_group_id == atomic_group_id
456 and not label.invalid)
457
458
showard54c1ea92009-05-20 00:32:58 +0000459 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000460 """
461 @param group_hosts - A sequence of Host ids to test for usability
462 and eligibility against the Job associated with queue_entry.
463 @param queue_entry - The HostQueueEntry that these hosts are being
464 tested for eligibility against.
465
466 @returns A subset of group_hosts Host ids that are eligible for the
467 supplied queue_entry.
468 """
469 return set(host_id for host_id in group_hosts
470 if self._is_host_usable(host_id)
471 and self._is_host_eligible_for_job(host_id, queue_entry))
472
473
showard989f25d2008-10-01 11:38:11 +0000474 def _is_host_eligible_for_job(self, host_id, queue_entry):
475 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
476 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000477
showard89f84db2009-03-12 20:39:13 +0000478 return (self._is_acl_accessible(host_id, queue_entry) and
479 self._check_job_dependencies(job_dependencies, host_labels) and
480 self._check_only_if_needed_labels(
481 job_dependencies, host_labels, queue_entry) and
482 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000483
484
showard63a34772008-08-18 19:32:50 +0000485 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000486 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000487 return None
488 return self._hosts_available.pop(queue_entry.host_id, None)
489
490
491 def _is_host_usable(self, host_id):
492 if host_id not in self._hosts_available:
493 # host was already used during this scheduling cycle
494 return False
495 if self._hosts_available[host_id].invalid:
496 # Invalid hosts cannot be used for metahosts. They're included in
497 # the original query because they can be used by non-metahosts.
498 return False
499 return True
500
501
502 def _schedule_metahost(self, queue_entry):
503 label_id = queue_entry.meta_host
504 hosts_in_label = self._label_hosts.get(label_id, set())
505 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
506 set())
507
508 # must iterate over a copy so we can mutate the original while iterating
509 for host_id in list(hosts_in_label):
510 if not self._is_host_usable(host_id):
511 hosts_in_label.remove(host_id)
512 continue
513 if host_id in ineligible_host_ids:
514 continue
showard989f25d2008-10-01 11:38:11 +0000515 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000516 continue
517
showard89f84db2009-03-12 20:39:13 +0000518 # Remove the host from our cached internal state before returning
519 # the host object.
showard63a34772008-08-18 19:32:50 +0000520 hosts_in_label.remove(host_id)
521 return self._hosts_available.pop(host_id)
522 return None
523
524
525 def find_eligible_host(self, queue_entry):
526 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000527 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000528 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000529 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000530 return self._schedule_metahost(queue_entry)
531
532
showard89f84db2009-03-12 20:39:13 +0000533 def find_eligible_atomic_group(self, queue_entry):
534 """
535 Given an atomic group host queue entry, locate an appropriate group
536 of hosts for the associated job to run on.
537
538 The caller is responsible for creating new HQEs for the additional
539 hosts returned in order to run the actual job on them.
540
541 @returns A list of Host instances in a ready state to satisfy this
542 atomic group scheduling. Hosts will all belong to the same
543 atomic group label as specified by the queue_entry.
544 An empty list will be returned if no suitable atomic
545 group could be found.
546
547 TODO(gps): what is responsible for kicking off any attempted repairs on
548 a group of hosts? not this function, but something needs to. We do
549 not communicate that reason for returning [] outside of here...
550 For now, we'll just be unschedulable if enough hosts within one group
551 enter Repair Failed state.
552 """
553 assert queue_entry.atomic_group_id is not None
554 job = queue_entry.job
555 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000556 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000557 if job.synch_count > atomic_group.max_number_of_machines:
558 # Such a Job and HostQueueEntry should never be possible to
559 # create using the frontend. Regardless, we can't process it.
560 # Abort it immediately and log an error on the scheduler.
561 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000562 logging.error(
563 'Error: job %d synch_count=%d > requested atomic_group %d '
564 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
565 job.id, job.synch_count, atomic_group.id,
566 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000567 return []
568 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
569 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
570 set())
571
572 # Look in each label associated with atomic_group until we find one with
573 # enough hosts to satisfy the job.
574 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
575 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
576 if queue_entry.meta_host is not None:
577 # If we have a metahost label, only allow its hosts.
578 group_hosts.intersection_update(hosts_in_label)
579 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000580 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000581 group_hosts, queue_entry)
582
583 # Job.synch_count is treated as "minimum synch count" when
584 # scheduling for an atomic group of hosts. The atomic group
585 # number of machines is the maximum to pick out of a single
586 # atomic group label for scheduling at one time.
587 min_hosts = job.synch_count
588 max_hosts = atomic_group.max_number_of_machines
589
showard54c1ea92009-05-20 00:32:58 +0000590 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000591 # Not enough eligible hosts in this atomic group label.
592 continue
593
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_hosts_in_group = [self._hosts_available[id]
595 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000596 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000597 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000598
showard89f84db2009-03-12 20:39:13 +0000599 # Limit ourselves to scheduling the atomic group size.
600 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000601 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000602
603 # Remove the selected hosts from our cached internal state
604 # of available hosts in order to return the Host objects.
605 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000606 for host in eligible_hosts_in_group:
607 hosts_in_label.discard(host.id)
608 self._hosts_available.pop(host.id)
609 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000610 return host_list
611
612 return []
613
614
showard170873e2009-01-07 00:22:26 +0000615class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000616 def __init__(self):
617 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000618 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000619 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000620 user_cleanup_time = scheduler_config.config.clean_interval
621 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
622 _db, user_cleanup_time)
623 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000624 self._host_agents = {}
625 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard915958d2009-04-22 21:00:58 +0000628 def initialize(self, recover_hosts=True):
629 self._periodic_cleanup.initialize()
630 self._24hr_upkeep.initialize()
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 # always recover processes
633 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000634
jadmanski0afbb632008-06-06 21:10:57 +0000635 if recover_hosts:
636 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def tick(self):
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000641 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000642 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000643 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000644 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000645 self._schedule_new_jobs()
646 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000647 _drone_manager.execute_actions()
648 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000649
showard97aed502008-11-04 02:01:24 +0000650
mblighf3294cc2009-04-08 21:17:38 +0000651 def _run_cleanup(self):
652 self._periodic_cleanup.run_cleanup_maybe()
653 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000654
mbligh36768f02008-02-22 18:28:33 +0000655
showard170873e2009-01-07 00:22:26 +0000656 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
657 for object_id in object_ids:
658 agent_dict.setdefault(object_id, set()).add(agent)
659
660
661 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
662 for object_id in object_ids:
663 assert object_id in agent_dict
664 agent_dict[object_id].remove(agent)
665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def add_agent(self, agent):
668 self._agents.append(agent)
669 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000670 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
671 self._register_agent_for_ids(self._queue_entry_agents,
672 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000673
showard170873e2009-01-07 00:22:26 +0000674
675 def get_agents_for_entry(self, queue_entry):
676 """
677 Find agents corresponding to the specified queue_entry.
678 """
showardd3dc1992009-04-22 21:01:40 +0000679 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000680
681
682 def host_has_agent(self, host):
683 """
684 Determine if there is currently an Agent present using this host.
685 """
686 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def remove_agent(self, agent):
690 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000691 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
692 agent)
693 self._unregister_agent_for_ids(self._queue_entry_agents,
694 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000695
696
jadmanski0afbb632008-06-06 21:10:57 +0000697 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000698 self._register_pidfiles()
699 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000700 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000701 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000702 self._reverify_remaining_hosts()
703 # reinitialize drones after killing orphaned processes, since they can
704 # leave around files when they die
705 _drone_manager.execute_actions()
706 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000707
showard170873e2009-01-07 00:22:26 +0000708
709 def _register_pidfiles(self):
710 # during recovery we may need to read pidfiles for both running and
711 # parsing entries
712 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000713 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000714 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000715 for pidfile_name in _ALL_PIDFILE_NAMES:
716 pidfile_id = _drone_manager.get_pidfile_id_from(
717 queue_entry.execution_tag(), pidfile_name=pidfile_name)
718 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000719
720
showardd3dc1992009-04-22 21:01:40 +0000721 def _recover_entries_with_status(self, status, orphans, pidfile_name,
722 recover_entries_fn):
723 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000724 for queue_entry in queue_entries:
725 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000726 # synchronous job we've already recovered
727 continue
showardd3dc1992009-04-22 21:01:40 +0000728 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000729 execution_tag = queue_entry.execution_tag()
730 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000731 run_monitor.attach_to_existing_process(execution_tag,
732 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000733
734 log_message = ('Recovering %s entry %s ' %
735 (status.lower(),
736 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000737 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000738 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000739 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000740 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000741 continue
mbligh90a549d2008-03-25 23:52:34 +0000742
showard597bfd32009-05-08 18:22:50 +0000743 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000744 run_monitor.get_process())
745 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
746 orphans.discard(run_monitor.get_process())
747
748
749 def _kill_remaining_orphan_processes(self, orphans):
750 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000751 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000752 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000753
showard170873e2009-01-07 00:22:26 +0000754
showardd3dc1992009-04-22 21:01:40 +0000755 def _recover_running_entries(self, orphans):
756 def recover_entries(job, queue_entries, run_monitor):
757 if run_monitor is not None:
758 queue_task = RecoveryQueueTask(job=job,
759 queue_entries=queue_entries,
760 run_monitor=run_monitor)
761 self.add_agent(Agent(tasks=[queue_task],
762 num_processes=len(queue_entries)))
763 # else, _requeue_other_active_entries will cover this
764
765 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
766 orphans, '.autoserv_execute',
767 recover_entries)
768
769
770 def _recover_gathering_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 gather_task = GatherLogsTask(job, queue_entries,
773 run_monitor=run_monitor)
774 self.add_agent(Agent([gather_task]))
775
776 self._recover_entries_with_status(
777 models.HostQueueEntry.Status.GATHERING,
778 orphans, _CRASHINFO_PID_FILE, recover_entries)
779
780
781 def _recover_parsing_entries(self, orphans):
782 def recover_entries(job, queue_entries, run_monitor):
783 reparse_task = FinalReparseTask(queue_entries,
784 run_monitor=run_monitor)
785 self.add_agent(Agent([reparse_task], num_processes=0))
786
787 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
788 orphans, _PARSER_PID_FILE,
789 recover_entries)
790
791
792 def _recover_all_recoverable_entries(self):
793 orphans = _drone_manager.get_orphaned_autoserv_processes()
794 self._recover_running_entries(orphans)
795 self._recover_gathering_entries(orphans)
796 self._recover_parsing_entries(orphans)
797 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000798
showard97aed502008-11-04 02:01:24 +0000799
showard170873e2009-01-07 00:22:26 +0000800 def _requeue_other_active_entries(self):
801 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000802 where='active AND NOT complete AND '
803 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000804 for queue_entry in queue_entries:
805 if self.get_agents_for_entry(queue_entry):
806 # entry has already been recovered
807 continue
showardd3dc1992009-04-22 21:01:40 +0000808 if queue_entry.aborted:
809 queue_entry.abort(self)
810 continue
811
812 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000813 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000814 if queue_entry.host:
815 tasks = queue_entry.host.reverify_tasks()
816 self.add_agent(Agent(tasks))
817 agent = queue_entry.requeue()
818
819
showard1ff7b2e2009-05-15 23:17:18 +0000820 def _find_reverify(self):
showarda64e52a2009-06-08 23:24:08 +0000821 self._reverify_hosts_where("status = 'Reverify'", cleanup=False)
showard1ff7b2e2009-05-15 23:17:18 +0000822
823
showard170873e2009-01-07 00:22:26 +0000824 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000825 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000826 self._reverify_hosts_where("""(status = 'Repairing' OR
827 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000828 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000829
showard170873e2009-01-07 00:22:26 +0000830 # recover "Running" hosts with no active queue entries, although this
831 # should never happen
832 message = ('Recovering running host %s - this probably indicates a '
833 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000834 self._reverify_hosts_where("""status = 'Running' AND
835 id NOT IN (SELECT host_id
836 FROM host_queue_entries
837 WHERE active)""",
838 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000839
840
jadmanski0afbb632008-06-06 21:10:57 +0000841 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000842 print_message='Reverifying host %s',
843 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000844 full_where='locked = 0 AND invalid = 0 AND ' + where
845 for host in Host.fetch(where=full_where):
846 if self.host_has_agent(host):
847 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000848 continue
showard170873e2009-01-07 00:22:26 +0000849 if print_message:
showardb18134f2009-03-20 20:52:18 +0000850 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000851 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000852 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000853
854
jadmanski0afbb632008-06-06 21:10:57 +0000855 def _recover_hosts(self):
856 # recover "Repair Failed" hosts
857 message = 'Reverifying dead host %s'
858 self._reverify_hosts_where("status = 'Repair Failed'",
859 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000860
861
showard04c82c52008-05-29 19:38:12 +0000862
showardb95b1bd2008-08-15 18:11:04 +0000863 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000864 # prioritize by job priority, then non-metahost over metahost, then FIFO
865 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000866 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000867 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000868 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000869
870
showard89f84db2009-03-12 20:39:13 +0000871 def _refresh_pending_queue_entries(self):
872 """
873 Lookup the pending HostQueueEntries and call our HostScheduler
874 refresh() method given that list. Return the list.
875
876 @returns A list of pending HostQueueEntries sorted in priority order.
877 """
showard63a34772008-08-18 19:32:50 +0000878 queue_entries = self._get_pending_queue_entries()
879 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000880 return []
showardb95b1bd2008-08-15 18:11:04 +0000881
showard63a34772008-08-18 19:32:50 +0000882 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000883
showard89f84db2009-03-12 20:39:13 +0000884 return queue_entries
885
886
887 def _schedule_atomic_group(self, queue_entry):
888 """
889 Schedule the given queue_entry on an atomic group of hosts.
890
891 Returns immediately if there are insufficient available hosts.
892
893 Creates new HostQueueEntries based off of queue_entry for the
894 scheduled hosts and starts them all running.
895 """
896 # This is a virtual host queue entry representing an entire
897 # atomic group, find a group and schedule their hosts.
898 group_hosts = self._host_scheduler.find_eligible_atomic_group(
899 queue_entry)
900 if not group_hosts:
901 return
902 # The first assigned host uses the original HostQueueEntry
903 group_queue_entries = [queue_entry]
904 for assigned_host in group_hosts[1:]:
905 # Create a new HQE for every additional assigned_host.
906 new_hqe = HostQueueEntry.clone(queue_entry)
907 new_hqe.save()
908 group_queue_entries.append(new_hqe)
909 assert len(group_queue_entries) == len(group_hosts)
910 for queue_entry, host in itertools.izip(group_queue_entries,
911 group_hosts):
912 self._run_queue_entry(queue_entry, host)
913
914
915 def _schedule_new_jobs(self):
916 queue_entries = self._refresh_pending_queue_entries()
917 if not queue_entries:
918 return
919
showard63a34772008-08-18 19:32:50 +0000920 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000921 if (queue_entry.atomic_group_id is None or
922 queue_entry.host_id is not None):
923 assigned_host = self._host_scheduler.find_eligible_host(
924 queue_entry)
925 if assigned_host:
926 self._run_queue_entry(queue_entry, assigned_host)
927 else:
928 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000929
930
931 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000932 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
933 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000934
935
jadmanski0afbb632008-06-06 21:10:57 +0000936 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000937 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
938 for agent in self.get_agents_for_entry(entry):
939 agent.abort()
940 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000941
942
showard324bf812009-01-20 23:23:38 +0000943 def _can_start_agent(self, agent, num_started_this_cycle,
944 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000945 # always allow zero-process agents to run
946 if agent.num_processes == 0:
947 return True
948 # don't allow any nonzero-process agents to run after we've reached a
949 # limit (this avoids starvation of many-process agents)
950 if have_reached_limit:
951 return False
952 # total process throttling
showard324bf812009-01-20 23:23:38 +0000953 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000954 return False
955 # if a single agent exceeds the per-cycle throttling, still allow it to
956 # run when it's the first agent in the cycle
957 if num_started_this_cycle == 0:
958 return True
959 # per-cycle throttling
960 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000961 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000962 return False
963 return True
964
965
jadmanski0afbb632008-06-06 21:10:57 +0000966 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000967 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000968 have_reached_limit = False
969 # iterate over copy, so we can remove agents during iteration
970 for agent in list(self._agents):
971 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000972 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000973 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000974 continue
975 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000976 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000977 have_reached_limit):
978 have_reached_limit = True
979 continue
showard4c5374f2008-09-04 17:02:56 +0000980 num_started_this_cycle += agent.num_processes
981 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000982 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000983 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000984
985
showard29f7cd22009-04-29 21:16:24 +0000986 def _process_recurring_runs(self):
987 recurring_runs = models.RecurringRun.objects.filter(
988 start_date__lte=datetime.datetime.now())
989 for rrun in recurring_runs:
990 # Create job from template
991 job = rrun.job
992 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000993 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000994
995 host_objects = info['hosts']
996 one_time_hosts = info['one_time_hosts']
997 metahost_objects = info['meta_hosts']
998 dependencies = info['dependencies']
999 atomic_group = info['atomic_group']
1000
1001 for host in one_time_hosts or []:
1002 this_host = models.Host.create_one_time_host(host.hostname)
1003 host_objects.append(this_host)
1004
1005 try:
1006 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001007 options=options,
showard29f7cd22009-04-29 21:16:24 +00001008 host_objects=host_objects,
1009 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001010 atomic_group=atomic_group)
1011
1012 except Exception, ex:
1013 logging.exception(ex)
1014 #TODO send email
1015
1016 if rrun.loop_count == 1:
1017 rrun.delete()
1018 else:
1019 if rrun.loop_count != 0: # if not infinite loop
1020 # calculate new start_date
1021 difference = datetime.timedelta(seconds=rrun.loop_period)
1022 rrun.start_date = rrun.start_date + difference
1023 rrun.loop_count -= 1
1024 rrun.save()
1025
1026
showard170873e2009-01-07 00:22:26 +00001027class PidfileRunMonitor(object):
1028 """
1029 Client must call either run() to start a new process or
1030 attach_to_existing_process().
1031 """
mbligh36768f02008-02-22 18:28:33 +00001032
showard170873e2009-01-07 00:22:26 +00001033 class _PidfileException(Exception):
1034 """
1035 Raised when there's some unexpected behavior with the pid file, but only
1036 used internally (never allowed to escape this class).
1037 """
mbligh36768f02008-02-22 18:28:33 +00001038
1039
showard170873e2009-01-07 00:22:26 +00001040 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001041 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001042 self._start_time = None
1043 self.pidfile_id = None
1044 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001045
1046
showard170873e2009-01-07 00:22:26 +00001047 def _add_nice_command(self, command, nice_level):
1048 if not nice_level:
1049 return command
1050 return ['nice', '-n', str(nice_level)] + command
1051
1052
1053 def _set_start_time(self):
1054 self._start_time = time.time()
1055
1056
1057 def run(self, command, working_directory, nice_level=None, log_file=None,
1058 pidfile_name=None, paired_with_pidfile=None):
1059 assert command is not None
1060 if nice_level is not None:
1061 command = ['nice', '-n', str(nice_level)] + command
1062 self._set_start_time()
1063 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001064 command, working_directory, pidfile_name=pidfile_name,
1065 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001066
1067
showardd3dc1992009-04-22 21:01:40 +00001068 def attach_to_existing_process(self, execution_tag,
1069 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001070 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001071 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1072 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001073 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001074
1075
jadmanski0afbb632008-06-06 21:10:57 +00001076 def kill(self):
showard170873e2009-01-07 00:22:26 +00001077 if self.has_process():
1078 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001079
mbligh36768f02008-02-22 18:28:33 +00001080
showard170873e2009-01-07 00:22:26 +00001081 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001082 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001083 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001084
1085
showard170873e2009-01-07 00:22:26 +00001086 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001087 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001088 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001089 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001090
1091
showard170873e2009-01-07 00:22:26 +00001092 def _read_pidfile(self, use_second_read=False):
1093 assert self.pidfile_id is not None, (
1094 'You must call run() or attach_to_existing_process()')
1095 contents = _drone_manager.get_pidfile_contents(
1096 self.pidfile_id, use_second_read=use_second_read)
1097 if contents.is_invalid():
1098 self._state = drone_manager.PidfileContents()
1099 raise self._PidfileException(contents)
1100 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001101
1102
showard21baa452008-10-21 00:08:39 +00001103 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001104 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1105 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001106 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001107 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001108 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001109
1110
1111 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001112 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001113 return
mblighbb421852008-03-11 22:36:16 +00001114
showard21baa452008-10-21 00:08:39 +00001115 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001116
showard170873e2009-01-07 00:22:26 +00001117 if self._state.process is None:
1118 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001119 return
mbligh90a549d2008-03-25 23:52:34 +00001120
showard21baa452008-10-21 00:08:39 +00001121 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001122 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001123 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001124 return
mbligh90a549d2008-03-25 23:52:34 +00001125
showard170873e2009-01-07 00:22:26 +00001126 # pid but no running process - maybe process *just* exited
1127 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001128 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001129 # autoserv exited without writing an exit code
1130 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001131 self._handle_pidfile_error(
1132 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001133
showard21baa452008-10-21 00:08:39 +00001134
1135 def _get_pidfile_info(self):
1136 """\
1137 After completion, self._state will contain:
1138 pid=None, exit_status=None if autoserv has not yet run
1139 pid!=None, exit_status=None if autoserv is running
1140 pid!=None, exit_status!=None if autoserv has completed
1141 """
1142 try:
1143 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001144 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001145 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001146
1147
showard170873e2009-01-07 00:22:26 +00001148 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001149 """\
1150 Called when no pidfile is found or no pid is in the pidfile.
1151 """
showard170873e2009-01-07 00:22:26 +00001152 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001153 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001154 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1155 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001156 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001157 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001158
1159
showard35162b02009-03-03 02:17:30 +00001160 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001161 """\
1162 Called when autoserv has exited without writing an exit status,
1163 or we've timed out waiting for autoserv to write a pid to the
1164 pidfile. In either case, we just return failure and the caller
1165 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001166
showard170873e2009-01-07 00:22:26 +00001167 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001168 """
1169 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001170 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001171 self._state.exit_status = 1
1172 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001173
1174
jadmanski0afbb632008-06-06 21:10:57 +00001175 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001176 self._get_pidfile_info()
1177 return self._state.exit_status
1178
1179
1180 def num_tests_failed(self):
1181 self._get_pidfile_info()
1182 assert self._state.num_tests_failed is not None
1183 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001184
1185
mbligh36768f02008-02-22 18:28:33 +00001186class Agent(object):
showard77182562009-06-10 00:16:05 +00001187 """
1188 An agent for use by the Dispatcher class to perform a sequence of tasks.
1189
1190 The following methods are required on all task objects:
1191 poll() - Called periodically to let the task check its status and
1192 update its internal state. If the task succeeded.
1193 is_done() - Returns True if the task is finished.
1194 abort() - Called when an abort has been requested. The task must
1195 set its aborted attribute to True if it actually aborted.
1196
1197 The following attributes are required on all task objects:
1198 aborted - bool, True if this task was aborted.
1199 failure_tasks - A sequence of tasks to be run using a new Agent
1200 by the dispatcher should this task fail.
1201 success - bool, True if this task succeeded.
1202 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1203 host_ids - A sequence of Host ids this task represents.
1204
1205 The following attribute is written to all task objects:
1206 agent - A reference to the Agent instance that the task has been
1207 added to.
1208 """
1209
1210
showard170873e2009-01-07 00:22:26 +00001211 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001212 """
1213 @param tasks: A list of tasks as described in the class docstring.
1214 @param num_processes: The number of subprocesses the Agent represents.
1215 This is used by the Dispatcher for managing the load on the
1216 system. Defaults to 1.
1217 """
jadmanski0afbb632008-06-06 21:10:57 +00001218 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001219 self.queue = None
showard77182562009-06-10 00:16:05 +00001220 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001221 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001222 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001223
showard170873e2009-01-07 00:22:26 +00001224 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1225 for task in tasks)
1226 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1227
showardd3dc1992009-04-22 21:01:40 +00001228 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001229 for task in tasks:
1230 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001231
1232
showardd3dc1992009-04-22 21:01:40 +00001233 def _clear_queue(self):
1234 self.queue = Queue.Queue(0)
1235
1236
showard170873e2009-01-07 00:22:26 +00001237 def _union_ids(self, id_lists):
1238 return set(itertools.chain(*id_lists))
1239
1240
jadmanski0afbb632008-06-06 21:10:57 +00001241 def add_task(self, task):
1242 self.queue.put_nowait(task)
1243 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001244
1245
jadmanski0afbb632008-06-06 21:10:57 +00001246 def tick(self):
showard21baa452008-10-21 00:08:39 +00001247 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001248 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001249 self.active_task.poll()
1250 if not self.active_task.is_done():
1251 return
1252 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001256 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001257 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001258 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001259 if not self.active_task.success:
1260 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001261 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001262
jadmanski0afbb632008-06-06 21:10:57 +00001263 if not self.is_done():
1264 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001265
1266
jadmanski0afbb632008-06-06 21:10:57 +00001267 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001268 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001269 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1270 # get reset.
1271 new_agent = Agent(self.active_task.failure_tasks)
1272 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001273
mblighe2586682008-02-29 22:45:46 +00001274
showard4c5374f2008-09-04 17:02:56 +00001275 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001276 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001277
1278
jadmanski0afbb632008-06-06 21:10:57 +00001279 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001280 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001281
1282
showardd3dc1992009-04-22 21:01:40 +00001283 def abort(self):
showard08a36412009-05-05 01:01:13 +00001284 # abort tasks until the queue is empty or a task ignores the abort
1285 while not self.is_done():
1286 if not self.active_task:
1287 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001288 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001289 if not self.active_task.aborted:
1290 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001291 return
1292 self.active_task = None
1293
showardd3dc1992009-04-22 21:01:40 +00001294
showard77182562009-06-10 00:16:05 +00001295class DelayedCallTask(object):
1296 """
1297 A task object like AgentTask for an Agent to run that waits for the
1298 specified amount of time to have elapsed before calling the supplied
1299 callback once and finishing. If the callback returns anything, it is
1300 assumed to be a new Agent instance and will be added to the dispatcher.
1301
1302 @attribute end_time: The absolute posix time after which this task will
1303 call its callback when it is polled and be finished.
1304
1305 Also has all attributes required by the Agent class.
1306 """
1307 def __init__(self, delay_seconds, callback, now_func=None):
1308 """
1309 @param delay_seconds: The delay in seconds from now that this task
1310 will call the supplied callback and be done.
1311 @param callback: A callable to be called by this task once after at
1312 least delay_seconds time has elapsed. It must return None
1313 or a new Agent instance.
1314 @param now_func: A time.time like function. Default: time.time.
1315 Used for testing.
1316 """
1317 assert delay_seconds > 0
1318 assert callable(callback)
1319 if not now_func:
1320 now_func = time.time
1321 self._now_func = now_func
1322 self._callback = callback
1323
1324 self.end_time = self._now_func() + delay_seconds
1325
1326 # These attributes are required by Agent.
1327 self.aborted = False
1328 self.failure_tasks = ()
1329 self.host_ids = ()
1330 self.success = False
1331 self.queue_entry_ids = ()
1332 # This is filled in by Agent.add_task().
1333 self.agent = None
1334
1335
1336 def poll(self):
1337 if self._callback and self._now_func() >= self.end_time:
1338 new_agent = self._callback()
1339 if new_agent:
1340 self.agent.dispatcher.add_agent(new_agent)
1341 self._callback = None
1342 self.success = True
1343
1344
1345 def is_done(self):
1346 return not self._callback
1347
1348
1349 def abort(self):
1350 self.aborted = True
1351 self._callback = None
1352
1353
mbligh36768f02008-02-22 18:28:33 +00001354class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001355 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1356 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001357 self.done = False
1358 self.failure_tasks = failure_tasks
1359 self.started = False
1360 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001361 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001362 self.task = None
1363 self.agent = None
1364 self.monitor = None
1365 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001366 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001367 self.queue_entry_ids = []
1368 self.host_ids = []
1369 self.log_file = None
1370
1371
1372 def _set_ids(self, host=None, queue_entries=None):
1373 if queue_entries and queue_entries != [None]:
1374 self.host_ids = [entry.host.id for entry in queue_entries]
1375 self.queue_entry_ids = [entry.id for entry in queue_entries]
1376 else:
1377 assert host
1378 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001379
1380
jadmanski0afbb632008-06-06 21:10:57 +00001381 def poll(self):
showard08a36412009-05-05 01:01:13 +00001382 if not self.started:
1383 self.start()
1384 self.tick()
1385
1386
1387 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001388 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001389 exit_code = self.monitor.exit_code()
1390 if exit_code is None:
1391 return
1392 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001393 else:
1394 success = False
mbligh36768f02008-02-22 18:28:33 +00001395
jadmanski0afbb632008-06-06 21:10:57 +00001396 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001397
1398
jadmanski0afbb632008-06-06 21:10:57 +00001399 def is_done(self):
1400 return self.done
mbligh36768f02008-02-22 18:28:33 +00001401
1402
jadmanski0afbb632008-06-06 21:10:57 +00001403 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001404 if self.done:
1405 return
jadmanski0afbb632008-06-06 21:10:57 +00001406 self.done = True
1407 self.success = success
1408 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001409
1410
jadmanski0afbb632008-06-06 21:10:57 +00001411 def prolog(self):
1412 pass
mblighd64e5702008-04-04 21:39:28 +00001413
1414
jadmanski0afbb632008-06-06 21:10:57 +00001415 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001416 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001417
mbligh36768f02008-02-22 18:28:33 +00001418
jadmanski0afbb632008-06-06 21:10:57 +00001419 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001420 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001421 _drone_manager.copy_to_results_repository(
1422 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001423
1424
jadmanski0afbb632008-06-06 21:10:57 +00001425 def epilog(self):
1426 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001427
1428
jadmanski0afbb632008-06-06 21:10:57 +00001429 def start(self):
1430 assert self.agent
1431
1432 if not self.started:
1433 self.prolog()
1434 self.run()
1435
1436 self.started = True
1437
1438
1439 def abort(self):
1440 if self.monitor:
1441 self.monitor.kill()
1442 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001443 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001444 self.cleanup()
1445
1446
showard170873e2009-01-07 00:22:26 +00001447 def set_host_log_file(self, base_name, host):
1448 filename = '%s.%s' % (time.time(), base_name)
1449 self.log_file = os.path.join('hosts', host.hostname, filename)
1450
1451
showardde634ee2009-01-30 01:44:24 +00001452 def _get_consistent_execution_tag(self, queue_entries):
1453 first_execution_tag = queue_entries[0].execution_tag()
1454 for queue_entry in queue_entries[1:]:
1455 assert queue_entry.execution_tag() == first_execution_tag, (
1456 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1457 queue_entry,
1458 first_execution_tag,
1459 queue_entries[0]))
1460 return first_execution_tag
1461
1462
showarda1e74b32009-05-12 17:32:04 +00001463 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001464 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001465 if use_monitor is None:
1466 assert self.monitor
1467 use_monitor = self.monitor
1468 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001469 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001470 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001471 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001472 results_path)
showardde634ee2009-01-30 01:44:24 +00001473
showarda1e74b32009-05-12 17:32:04 +00001474
1475 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001476 reparse_task = FinalReparseTask(queue_entries)
1477 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1478
1479
showarda1e74b32009-05-12 17:32:04 +00001480 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1481 self._copy_results(queue_entries, use_monitor)
1482 self._parse_results(queue_entries)
1483
1484
showardd3dc1992009-04-22 21:01:40 +00001485 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001486 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001487 self.monitor = PidfileRunMonitor()
1488 self.monitor.run(self.cmd, self._working_directory,
1489 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001490 log_file=self.log_file,
1491 pidfile_name=pidfile_name,
1492 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001493
1494
showardd9205182009-04-27 20:09:55 +00001495class TaskWithJobKeyvals(object):
1496 """AgentTask mixin providing functionality to help with job keyval files."""
1497 _KEYVAL_FILE = 'keyval'
1498 def _format_keyval(self, key, value):
1499 return '%s=%s' % (key, value)
1500
1501
1502 def _keyval_path(self):
1503 """Subclasses must override this"""
1504 raise NotImplemented
1505
1506
1507 def _write_keyval_after_job(self, field, value):
1508 assert self.monitor
1509 if not self.monitor.has_process():
1510 return
1511 _drone_manager.write_lines_to_file(
1512 self._keyval_path(), [self._format_keyval(field, value)],
1513 paired_with_process=self.monitor.get_process())
1514
1515
1516 def _job_queued_keyval(self, job):
1517 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1518
1519
1520 def _write_job_finished(self):
1521 self._write_keyval_after_job("job_finished", int(time.time()))
1522
1523
1524class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001525 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001526 """\
showard170873e2009-01-07 00:22:26 +00001527 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001528 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001529 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001530 # normalize the protection name
1531 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001532
jadmanski0afbb632008-06-06 21:10:57 +00001533 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001534 self.queue_entry_to_fail = queue_entry
1535 # *don't* include the queue entry in IDs -- if the queue entry is
1536 # aborted, we want to leave the repair task running
1537 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001538
1539 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001540 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1541 ['-R', '--host-protection', protection],
1542 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001543 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1544
showard170873e2009-01-07 00:22:26 +00001545 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001546
mbligh36768f02008-02-22 18:28:33 +00001547
jadmanski0afbb632008-06-06 21:10:57 +00001548 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001549 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001550 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001551 if self.queue_entry_to_fail:
1552 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001553
1554
showardd9205182009-04-27 20:09:55 +00001555 def _keyval_path(self):
1556 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1557
1558
showardde634ee2009-01-30 01:44:24 +00001559 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001560 assert self.queue_entry_to_fail
1561
1562 if self.queue_entry_to_fail.meta_host:
1563 return # don't fail metahost entries, they'll be reassigned
1564
1565 self.queue_entry_to_fail.update_from_database()
1566 if self.queue_entry_to_fail.status != 'Queued':
1567 return # entry has been aborted
1568
1569 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001570 queued_key, queued_time = self._job_queued_keyval(
1571 self.queue_entry_to_fail.job)
1572 self._write_keyval_after_job(queued_key, queued_time)
1573 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001574 # copy results logs into the normal place for job results
1575 _drone_manager.copy_results_on_drone(
1576 self.monitor.get_process(),
1577 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001578 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001579
showarda1e74b32009-05-12 17:32:04 +00001580 self._copy_results([self.queue_entry_to_fail])
1581 if self.queue_entry_to_fail.job.parse_failed_repair:
1582 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001583 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001584
1585
jadmanski0afbb632008-06-06 21:10:57 +00001586 def epilog(self):
1587 super(RepairTask, self).epilog()
1588 if self.success:
1589 self.host.set_status('Ready')
1590 else:
1591 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001592 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001593 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001594
1595
showard8fe93b52008-11-18 17:53:22 +00001596class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001597 def epilog(self):
1598 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001599 should_copy_results = (self.queue_entry and not self.success
1600 and not self.queue_entry.meta_host)
1601 if should_copy_results:
1602 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001603 destination = os.path.join(self.queue_entry.execution_tag(),
1604 os.path.basename(self.log_file))
1605 _drone_manager.copy_to_results_repository(
1606 self.monitor.get_process(), self.log_file,
1607 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001608
1609
1610class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001611 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001612 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001613 self.host = host or queue_entry.host
1614 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001615
jadmanski0afbb632008-06-06 21:10:57 +00001616 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001617 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1618 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001619 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001620 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1621 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001622
showard170873e2009-01-07 00:22:26 +00001623 self.set_host_log_file('verify', self.host)
1624 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001625
1626
jadmanski0afbb632008-06-06 21:10:57 +00001627 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001628 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001629 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001630 if self.queue_entry:
1631 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001632 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001633
1634
jadmanski0afbb632008-06-06 21:10:57 +00001635 def epilog(self):
1636 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001637
jadmanski0afbb632008-06-06 21:10:57 +00001638 if self.success:
1639 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001640
1641
showardd9205182009-04-27 20:09:55 +00001642class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001643 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001644 self.job = job
1645 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001646 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001647 super(QueueTask, self).__init__(cmd, self._execution_tag())
1648 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001649
1650
showard73ec0442009-02-07 02:05:20 +00001651 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001652 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001653
1654
1655 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1656 keyval_contents = '\n'.join(self._format_keyval(key, value)
1657 for key, value in keyval_dict.iteritems())
1658 # always end with a newline to allow additional keyvals to be written
1659 keyval_contents += '\n'
1660 _drone_manager.attach_file_to_execution(self._execution_tag(),
1661 keyval_contents,
1662 file_path=keyval_path)
1663
1664
1665 def _write_keyvals_before_job(self, keyval_dict):
1666 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1667
1668
showard170873e2009-01-07 00:22:26 +00001669 def _write_host_keyvals(self, host):
1670 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1671 host.hostname)
1672 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001673 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1674 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001675
1676
showard170873e2009-01-07 00:22:26 +00001677 def _execution_tag(self):
1678 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001679
1680
jadmanski0afbb632008-06-06 21:10:57 +00001681 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001682 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001683 keyval_dict = {queued_key: queued_time}
1684 if self.group_name:
1685 keyval_dict['host_group_name'] = self.group_name
1686 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001687 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001688 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001689 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001690 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001691 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001692 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001693 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001694 assert len(self.queue_entries) == 1
1695 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001696
1697
showard35162b02009-03-03 02:17:30 +00001698 def _write_lost_process_error_file(self):
1699 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1700 _drone_manager.write_lines_to_file(error_file_path,
1701 [_LOST_PROCESS_ERROR])
1702
1703
showardd3dc1992009-04-22 21:01:40 +00001704 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001705 if not self.monitor:
1706 return
1707
showardd9205182009-04-27 20:09:55 +00001708 self._write_job_finished()
1709
showardd3dc1992009-04-22 21:01:40 +00001710 # both of these conditionals can be true, iff the process ran, wrote a
1711 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001712 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001713 gather_task = GatherLogsTask(self.job, self.queue_entries)
1714 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001715
1716 if self.monitor.lost_process:
1717 self._write_lost_process_error_file()
1718 for queue_entry in self.queue_entries:
1719 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001720
1721
showardcbd74612008-11-19 21:42:02 +00001722 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001723 _drone_manager.write_lines_to_file(
1724 os.path.join(self._execution_tag(), 'status.log'),
1725 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001726 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001727
1728
jadmanskif7fa2cc2008-10-01 14:13:23 +00001729 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001730 if not self.monitor or not self.monitor.has_process():
1731 return
1732
jadmanskif7fa2cc2008-10-01 14:13:23 +00001733 # build up sets of all the aborted_by and aborted_on values
1734 aborted_by, aborted_on = set(), set()
1735 for queue_entry in self.queue_entries:
1736 if queue_entry.aborted_by:
1737 aborted_by.add(queue_entry.aborted_by)
1738 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1739 aborted_on.add(t)
1740
1741 # extract some actual, unique aborted by value and write it out
1742 assert len(aborted_by) <= 1
1743 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001744 aborted_by_value = aborted_by.pop()
1745 aborted_on_value = max(aborted_on)
1746 else:
1747 aborted_by_value = 'autotest_system'
1748 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001749
showarda0382352009-02-11 23:36:43 +00001750 self._write_keyval_after_job("aborted_by", aborted_by_value)
1751 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001752
showardcbd74612008-11-19 21:42:02 +00001753 aborted_on_string = str(datetime.datetime.fromtimestamp(
1754 aborted_on_value))
1755 self._write_status_comment('Job aborted by %s on %s' %
1756 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001757
1758
jadmanski0afbb632008-06-06 21:10:57 +00001759 def abort(self):
1760 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001761 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001762 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001763
1764
jadmanski0afbb632008-06-06 21:10:57 +00001765 def epilog(self):
1766 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001767 self._finish_task()
1768 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001769
1770
mblighbb421852008-03-11 22:36:16 +00001771class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001772 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001773 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001774 self.monitor = run_monitor
1775 self.started = True
1776 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001777
1778
jadmanski0afbb632008-06-06 21:10:57 +00001779 def run(self):
showard5add1c82009-05-26 19:27:46 +00001780 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001781
1782
jadmanski0afbb632008-06-06 21:10:57 +00001783 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001784 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001785
1786
showardd3dc1992009-04-22 21:01:40 +00001787class PostJobTask(AgentTask):
1788 def __init__(self, queue_entries, pidfile_name, logfile_name,
1789 run_monitor=None):
1790 """
1791 If run_monitor != None, we're recovering a running task.
1792 """
1793 self._queue_entries = queue_entries
1794 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001795
1796 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1797 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1798 self._autoserv_monitor = PidfileRunMonitor()
1799 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1800 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1801
1802 if _testing_mode:
1803 command = 'true'
1804 else:
1805 command = self._generate_command(self._results_dir)
1806
1807 super(PostJobTask, self).__init__(cmd=command,
1808 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001809 # this must happen *after* the super call
1810 self.monitor = run_monitor
1811 if run_monitor:
1812 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001813
1814 self.log_file = os.path.join(self._execution_tag, logfile_name)
1815 self._final_status = self._determine_final_status()
1816
1817
1818 def _generate_command(self, results_dir):
1819 raise NotImplementedError('Subclasses must override this')
1820
1821
1822 def _job_was_aborted(self):
1823 was_aborted = None
1824 for queue_entry in self._queue_entries:
1825 queue_entry.update_from_database()
1826 if was_aborted is None: # first queue entry
1827 was_aborted = bool(queue_entry.aborted)
1828 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1829 email_manager.manager.enqueue_notify_email(
1830 'Inconsistent abort state',
1831 'Queue entries have inconsistent abort state: ' +
1832 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1833 # don't crash here, just assume true
1834 return True
1835 return was_aborted
1836
1837
1838 def _determine_final_status(self):
1839 if self._job_was_aborted():
1840 return models.HostQueueEntry.Status.ABORTED
1841
1842 # we'll use a PidfileRunMonitor to read the autoserv exit status
1843 if self._autoserv_monitor.exit_code() == 0:
1844 return models.HostQueueEntry.Status.COMPLETED
1845 return models.HostQueueEntry.Status.FAILED
1846
1847
1848 def run(self):
showard5add1c82009-05-26 19:27:46 +00001849 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001850
showard5add1c82009-05-26 19:27:46 +00001851 # make sure we actually have results to work with.
1852 # this should never happen in normal operation.
1853 if not self._autoserv_monitor.has_process():
1854 email_manager.manager.enqueue_notify_email(
1855 'No results in post-job task',
1856 'No results in post-job task at %s' %
1857 self._autoserv_monitor.pidfile_id)
1858 self.finished(False)
1859 return
1860
1861 super(PostJobTask, self).run(
1862 pidfile_name=self._pidfile_name,
1863 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001864
1865
1866 def _set_all_statuses(self, status):
1867 for queue_entry in self._queue_entries:
1868 queue_entry.set_status(status)
1869
1870
1871 def abort(self):
1872 # override AgentTask.abort() to avoid killing the process and ending
1873 # the task. post-job tasks continue when the job is aborted.
1874 pass
1875
1876
1877class GatherLogsTask(PostJobTask):
1878 """
1879 Task responsible for
1880 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1881 * copying logs to the results repository
1882 * spawning CleanupTasks for hosts, if necessary
1883 * spawning a FinalReparseTask for the job
1884 """
1885 def __init__(self, job, queue_entries, run_monitor=None):
1886 self._job = job
1887 super(GatherLogsTask, self).__init__(
1888 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1889 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1890 self._set_ids(queue_entries=queue_entries)
1891
1892
1893 def _generate_command(self, results_dir):
1894 host_list = ','.join(queue_entry.host.hostname
1895 for queue_entry in self._queue_entries)
1896 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1897 '-r', results_dir]
1898
1899
1900 def prolog(self):
1901 super(GatherLogsTask, self).prolog()
1902 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1903
1904
1905 def _reboot_hosts(self):
1906 reboot_after = self._job.reboot_after
1907 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001908 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1909 do_reboot = True
1910 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001911 do_reboot = True
1912 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1913 final_success = (
1914 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1915 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1916 do_reboot = (final_success and num_tests_failed == 0)
1917
1918 for queue_entry in self._queue_entries:
1919 if do_reboot:
1920 # don't pass the queue entry to the CleanupTask. if the cleanup
1921 # fails, the job doesn't care -- it's over.
1922 cleanup_task = CleanupTask(host=queue_entry.host)
1923 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1924 else:
1925 queue_entry.host.set_status('Ready')
1926
1927
1928 def epilog(self):
1929 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001930 if self._autoserv_monitor.has_process():
1931 self._copy_and_parse_results(self._queue_entries,
1932 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001933 self._reboot_hosts()
1934
1935
showard0bbfc212009-04-29 21:06:13 +00001936 def run(self):
showard597bfd32009-05-08 18:22:50 +00001937 autoserv_exit_code = self._autoserv_monitor.exit_code()
1938 # only run if Autoserv exited due to some signal. if we have no exit
1939 # code, assume something bad (and signal-like) happened.
1940 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001941 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001942 else:
1943 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001944
1945
showard8fe93b52008-11-18 17:53:22 +00001946class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001947 def __init__(self, host=None, queue_entry=None):
1948 assert bool(host) ^ bool(queue_entry)
1949 if queue_entry:
1950 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001951 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001952 self.host = host
showard170873e2009-01-07 00:22:26 +00001953
1954 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001955 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1956 ['--cleanup'],
1957 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001958 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001959 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1960 failure_tasks=[repair_task])
1961
1962 self._set_ids(host=host, queue_entries=[queue_entry])
1963 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001964
mblighd5c95802008-03-05 00:33:46 +00001965
jadmanski0afbb632008-06-06 21:10:57 +00001966 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001967 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001968 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001969 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001970
mblighd5c95802008-03-05 00:33:46 +00001971
showard21baa452008-10-21 00:08:39 +00001972 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001973 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001974 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001975 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001976 self.host.update_field('dirty', 0)
1977
1978
showardd3dc1992009-04-22 21:01:40 +00001979class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001980 _num_running_parses = 0
1981
showardd3dc1992009-04-22 21:01:40 +00001982 def __init__(self, queue_entries, run_monitor=None):
1983 super(FinalReparseTask, self).__init__(queue_entries,
1984 pidfile_name=_PARSER_PID_FILE,
1985 logfile_name='.parse.log',
1986 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001987 # don't use _set_ids, since we don't want to set the host_ids
1988 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00001989 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00001990
showard97aed502008-11-04 02:01:24 +00001991
1992 @classmethod
1993 def _increment_running_parses(cls):
1994 cls._num_running_parses += 1
1995
1996
1997 @classmethod
1998 def _decrement_running_parses(cls):
1999 cls._num_running_parses -= 1
2000
2001
2002 @classmethod
2003 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002004 return (cls._num_running_parses <
2005 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002006
2007
2008 def prolog(self):
2009 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002010 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002011
2012
2013 def epilog(self):
2014 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002015 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002016
2017
showardd3dc1992009-04-22 21:01:40 +00002018 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002019 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002020 results_dir]
showard97aed502008-11-04 02:01:24 +00002021
2022
showard08a36412009-05-05 01:01:13 +00002023 def tick(self):
2024 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002025 # and we can, at which point we revert to default behavior
2026 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002027 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002028 else:
2029 self._try_starting_parse()
2030
2031
2032 def run(self):
2033 # override run() to not actually run unless we can
2034 self._try_starting_parse()
2035
2036
2037 def _try_starting_parse(self):
2038 if not self._can_run_new_parse():
2039 return
showard170873e2009-01-07 00:22:26 +00002040
showard97aed502008-11-04 02:01:24 +00002041 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002042 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002043
showard97aed502008-11-04 02:01:24 +00002044 self._increment_running_parses()
2045 self._parse_started = True
2046
2047
2048 def finished(self, success):
2049 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002050 if self._parse_started:
2051 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002052
2053
showardc9ae1782009-01-30 01:42:37 +00002054class SetEntryPendingTask(AgentTask):
2055 def __init__(self, queue_entry):
2056 super(SetEntryPendingTask, self).__init__(cmd='')
2057 self._queue_entry = queue_entry
2058 self._set_ids(queue_entries=[queue_entry])
2059
2060
2061 def run(self):
2062 agent = self._queue_entry.on_pending()
2063 if agent:
2064 self.agent.dispatcher.add_agent(agent)
2065 self.finished(True)
2066
2067
showarda3c58572009-03-12 20:36:59 +00002068class DBError(Exception):
2069 """Raised by the DBObject constructor when its select fails."""
2070
2071
mbligh36768f02008-02-22 18:28:33 +00002072class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002073 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002074
2075 # Subclasses MUST override these:
2076 _table_name = ''
2077 _fields = ()
2078
showarda3c58572009-03-12 20:36:59 +00002079 # A mapping from (type, id) to the instance of the object for that
2080 # particular id. This prevents us from creating new Job() and Host()
2081 # instances for every HostQueueEntry object that we instantiate as
2082 # multiple HQEs often share the same Job.
2083 _instances_by_type_and_id = weakref.WeakValueDictionary()
2084 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002085
showarda3c58572009-03-12 20:36:59 +00002086
2087 def __new__(cls, id=None, **kwargs):
2088 """
2089 Look to see if we already have an instance for this particular type
2090 and id. If so, use it instead of creating a duplicate instance.
2091 """
2092 if id is not None:
2093 instance = cls._instances_by_type_and_id.get((cls, id))
2094 if instance:
2095 return instance
2096 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2097
2098
2099 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002100 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002101 assert self._table_name, '_table_name must be defined in your class'
2102 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002103 if not new_record:
2104 if self._initialized and not always_query:
2105 return # We've already been initialized.
2106 if id is None:
2107 id = row[0]
2108 # Tell future constructors to use us instead of re-querying while
2109 # this instance is still around.
2110 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002111
showard6ae5ea92009-02-25 00:11:51 +00002112 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002113
jadmanski0afbb632008-06-06 21:10:57 +00002114 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002115
jadmanski0afbb632008-06-06 21:10:57 +00002116 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002117 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002118
showarda3c58572009-03-12 20:36:59 +00002119 if self._initialized:
2120 differences = self._compare_fields_in_row(row)
2121 if differences:
showard7629f142009-03-27 21:02:02 +00002122 logging.warn(
2123 'initialized %s %s instance requery is updating: %s',
2124 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002125 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002126 self._initialized = True
2127
2128
2129 @classmethod
2130 def _clear_instance_cache(cls):
2131 """Used for testing, clear the internal instance cache."""
2132 cls._instances_by_type_and_id.clear()
2133
2134
showardccbd6c52009-03-21 00:10:21 +00002135 def _fetch_row_from_db(self, row_id):
2136 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2137 rows = _db.execute(sql, (row_id,))
2138 if not rows:
showard76e29d12009-04-15 21:53:10 +00002139 raise DBError("row not found (table=%s, row id=%s)"
2140 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002141 return rows[0]
2142
2143
showarda3c58572009-03-12 20:36:59 +00002144 def _assert_row_length(self, row):
2145 assert len(row) == len(self._fields), (
2146 "table = %s, row = %s/%d, fields = %s/%d" % (
2147 self.__table, row, len(row), self._fields, len(self._fields)))
2148
2149
2150 def _compare_fields_in_row(self, row):
2151 """
2152 Given a row as returned by a SELECT query, compare it to our existing
2153 in memory fields.
2154
2155 @param row - A sequence of values corresponding to fields named in
2156 The class attribute _fields.
2157
2158 @returns A dictionary listing the differences keyed by field name
2159 containing tuples of (current_value, row_value).
2160 """
2161 self._assert_row_length(row)
2162 differences = {}
2163 for field, row_value in itertools.izip(self._fields, row):
2164 current_value = getattr(self, field)
2165 if current_value != row_value:
2166 differences[field] = (current_value, row_value)
2167 return differences
showard2bab8f42008-11-12 18:15:22 +00002168
2169
2170 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002171 """
2172 Update our field attributes using a single row returned by SELECT.
2173
2174 @param row - A sequence of values corresponding to fields named in
2175 the class fields list.
2176 """
2177 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002178
showard2bab8f42008-11-12 18:15:22 +00002179 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002180 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002181 setattr(self, field, value)
2182 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002183
showard2bab8f42008-11-12 18:15:22 +00002184 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002185
mblighe2586682008-02-29 22:45:46 +00002186
showardccbd6c52009-03-21 00:10:21 +00002187 def update_from_database(self):
2188 assert self.id is not None
2189 row = self._fetch_row_from_db(self.id)
2190 self._update_fields_from_row(row)
2191
2192
jadmanski0afbb632008-06-06 21:10:57 +00002193 def count(self, where, table = None):
2194 if not table:
2195 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002196
jadmanski0afbb632008-06-06 21:10:57 +00002197 rows = _db.execute("""
2198 SELECT count(*) FROM %s
2199 WHERE %s
2200 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002201
jadmanski0afbb632008-06-06 21:10:57 +00002202 assert len(rows) == 1
2203
2204 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002205
2206
showardd3dc1992009-04-22 21:01:40 +00002207 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002208 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002209
showard2bab8f42008-11-12 18:15:22 +00002210 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002211 return
mbligh36768f02008-02-22 18:28:33 +00002212
mblighf8c624d2008-07-03 16:58:45 +00002213 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002214 _db.execute(query, (value, self.id))
2215
showard2bab8f42008-11-12 18:15:22 +00002216 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002217
2218
jadmanski0afbb632008-06-06 21:10:57 +00002219 def save(self):
2220 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002221 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002222 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002223 values = []
2224 for key in keys:
2225 value = getattr(self, key)
2226 if value is None:
2227 values.append('NULL')
2228 else:
2229 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002230 values_str = ','.join(values)
2231 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2232 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002233 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002234 # Update our id to the one the database just assigned to us.
2235 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002236
2237
jadmanski0afbb632008-06-06 21:10:57 +00002238 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002239 self._instances_by_type_and_id.pop((type(self), id), None)
2240 self._initialized = False
2241 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002242 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2243 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002244
2245
showard63a34772008-08-18 19:32:50 +00002246 @staticmethod
2247 def _prefix_with(string, prefix):
2248 if string:
2249 string = prefix + string
2250 return string
2251
2252
jadmanski0afbb632008-06-06 21:10:57 +00002253 @classmethod
showard989f25d2008-10-01 11:38:11 +00002254 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002255 """
2256 Construct instances of our class based on the given database query.
2257
2258 @yields One class instance for each row fetched.
2259 """
showard63a34772008-08-18 19:32:50 +00002260 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2261 where = cls._prefix_with(where, 'WHERE ')
2262 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002263 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002264 'joins' : joins,
2265 'where' : where,
2266 'order_by' : order_by})
2267 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002268 for row in rows:
2269 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002270
mbligh36768f02008-02-22 18:28:33 +00002271
2272class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002273 _table_name = 'ineligible_host_queues'
2274 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002275
2276
showard89f84db2009-03-12 20:39:13 +00002277class AtomicGroup(DBObject):
2278 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002279 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2280 'invalid')
showard89f84db2009-03-12 20:39:13 +00002281
2282
showard989f25d2008-10-01 11:38:11 +00002283class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002284 _table_name = 'labels'
2285 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002286 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002287
2288
mbligh36768f02008-02-22 18:28:33 +00002289class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002290 _table_name = 'hosts'
2291 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2292 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2293
2294
jadmanski0afbb632008-06-06 21:10:57 +00002295 def current_task(self):
2296 rows = _db.execute("""
2297 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2298 """, (self.id,))
2299
2300 if len(rows) == 0:
2301 return None
2302 else:
2303 assert len(rows) == 1
2304 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002305 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002306
2307
jadmanski0afbb632008-06-06 21:10:57 +00002308 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002309 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002310 if self.current_task():
2311 self.current_task().requeue()
2312
showard6ae5ea92009-02-25 00:11:51 +00002313
jadmanski0afbb632008-06-06 21:10:57 +00002314 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002315 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002316 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002317
2318
showard170873e2009-01-07 00:22:26 +00002319 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002320 """
showard170873e2009-01-07 00:22:26 +00002321 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002322 """
2323 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002324 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002325 FROM labels
2326 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002327 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002328 ORDER BY labels.name
2329 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002330 platform = None
2331 all_labels = []
2332 for label_name, is_platform in rows:
2333 if is_platform:
2334 platform = label_name
2335 all_labels.append(label_name)
2336 return platform, all_labels
2337
2338
showarda64e52a2009-06-08 23:24:08 +00002339 def reverify_tasks(self, cleanup=True):
2340 tasks = [VerifyTask(host=self)]
2341 if cleanup:
2342 tasks.insert(0, CleanupTask(host=self))
showard170873e2009-01-07 00:22:26 +00002343 # just to make sure this host does not get taken away
2344 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002345 return tasks
showardd8e548a2008-09-09 03:04:57 +00002346
2347
showard54c1ea92009-05-20 00:32:58 +00002348 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2349
2350
2351 @classmethod
2352 def cmp_for_sort(cls, a, b):
2353 """
2354 A comparison function for sorting Host objects by hostname.
2355
2356 This strips any trailing numeric digits, ignores leading 0s and
2357 compares hostnames by the leading name and the trailing digits as a
2358 number. If both hostnames do not match this pattern, they are simply
2359 compared as lower case strings.
2360
2361 Example of how hostnames will be sorted:
2362
2363 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2364
2365 This hopefully satisfy most people's hostname sorting needs regardless
2366 of their exact naming schemes. Nobody sane should have both a host10
2367 and host010 (but the algorithm works regardless).
2368 """
2369 lower_a = a.hostname.lower()
2370 lower_b = b.hostname.lower()
2371 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2372 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2373 if match_a and match_b:
2374 name_a, number_a_str = match_a.groups()
2375 name_b, number_b_str = match_b.groups()
2376 number_a = int(number_a_str.lstrip('0'))
2377 number_b = int(number_b_str.lstrip('0'))
2378 result = cmp((name_a, number_a), (name_b, number_b))
2379 if result == 0 and lower_a != lower_b:
2380 # If they compared equal above but the lower case names are
2381 # indeed different, don't report equality. abc012 != abc12.
2382 return cmp(lower_a, lower_b)
2383 return result
2384 else:
2385 return cmp(lower_a, lower_b)
2386
2387
mbligh36768f02008-02-22 18:28:33 +00002388class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002389 _table_name = 'host_queue_entries'
2390 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002391 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002392 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002393
2394
showarda3c58572009-03-12 20:36:59 +00002395 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002396 assert id or row
showarda3c58572009-03-12 20:36:59 +00002397 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002398 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002399
jadmanski0afbb632008-06-06 21:10:57 +00002400 if self.host_id:
2401 self.host = Host(self.host_id)
2402 else:
2403 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002404
showard77182562009-06-10 00:16:05 +00002405 if self.atomic_group_id:
2406 self.atomic_group = AtomicGroup(self.atomic_group_id,
2407 always_query=False)
2408 else:
2409 self.atomic_group = None
2410
showard170873e2009-01-07 00:22:26 +00002411 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002412 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002413
2414
showard89f84db2009-03-12 20:39:13 +00002415 @classmethod
2416 def clone(cls, template):
2417 """
2418 Creates a new row using the values from a template instance.
2419
2420 The new instance will not exist in the database or have a valid
2421 id attribute until its save() method is called.
2422 """
2423 assert isinstance(template, cls)
2424 new_row = [getattr(template, field) for field in cls._fields]
2425 clone = cls(row=new_row, new_record=True)
2426 clone.id = None
2427 return clone
2428
2429
showardc85c21b2008-11-24 22:17:37 +00002430 def _view_job_url(self):
2431 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2432
2433
showardf1ae3542009-05-11 19:26:02 +00002434 def get_labels(self):
2435 """
2436 Get all labels associated with this host queue entry (either via the
2437 meta_host or as a job dependency label). The labels yielded are not
2438 guaranteed to be unique.
2439
2440 @yields Label instances associated with this host_queue_entry.
2441 """
2442 if self.meta_host:
2443 yield Label(id=self.meta_host, always_query=False)
2444 labels = Label.fetch(
2445 joins="JOIN jobs_dependency_labels AS deps "
2446 "ON (labels.id = deps.label_id)",
2447 where="deps.job_id = %d" % self.job.id)
2448 for label in labels:
2449 yield label
2450
2451
jadmanski0afbb632008-06-06 21:10:57 +00002452 def set_host(self, host):
2453 if host:
2454 self.queue_log_record('Assigning host ' + host.hostname)
2455 self.update_field('host_id', host.id)
2456 self.update_field('active', True)
2457 self.block_host(host.id)
2458 else:
2459 self.queue_log_record('Releasing host')
2460 self.unblock_host(self.host.id)
2461 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002462
jadmanski0afbb632008-06-06 21:10:57 +00002463 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002464
2465
jadmanski0afbb632008-06-06 21:10:57 +00002466 def get_host(self):
2467 return self.host
mbligh36768f02008-02-22 18:28:33 +00002468
2469
jadmanski0afbb632008-06-06 21:10:57 +00002470 def queue_log_record(self, log_line):
2471 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002472 _drone_manager.write_lines_to_file(self.queue_log_path,
2473 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002474
2475
jadmanski0afbb632008-06-06 21:10:57 +00002476 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002477 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002478 row = [0, self.job.id, host_id]
2479 block = IneligibleHostQueue(row=row, new_record=True)
2480 block.save()
mblighe2586682008-02-29 22:45:46 +00002481
2482
jadmanski0afbb632008-06-06 21:10:57 +00002483 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002484 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002485 blocks = IneligibleHostQueue.fetch(
2486 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2487 for block in blocks:
2488 block.delete()
mblighe2586682008-02-29 22:45:46 +00002489
2490
showard2bab8f42008-11-12 18:15:22 +00002491 def set_execution_subdir(self, subdir=None):
2492 if subdir is None:
2493 assert self.get_host()
2494 subdir = self.get_host().hostname
2495 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002496
2497
showard6355f6b2008-12-05 18:52:13 +00002498 def _get_hostname(self):
2499 if self.host:
2500 return self.host.hostname
2501 return 'no host'
2502
2503
showard170873e2009-01-07 00:22:26 +00002504 def __str__(self):
2505 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2506
2507
jadmanski0afbb632008-06-06 21:10:57 +00002508 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002509 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002510
showardb18134f2009-03-20 20:52:18 +00002511 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002512
showardc85c21b2008-11-24 22:17:37 +00002513 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002514 self.update_field('complete', False)
2515 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002516
jadmanski0afbb632008-06-06 21:10:57 +00002517 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002518 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002519 self.update_field('complete', False)
2520 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002521
showardc85c21b2008-11-24 22:17:37 +00002522 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002523 self.update_field('complete', True)
2524 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002525
2526 should_email_status = (status.lower() in _notify_email_statuses or
2527 'all' in _notify_email_statuses)
2528 if should_email_status:
2529 self._email_on_status(status)
2530
2531 self._email_on_job_complete()
2532
2533
2534 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002535 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002536
2537 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2538 self.job.id, self.job.name, hostname, status)
2539 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2540 self.job.id, self.job.name, hostname, status,
2541 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002542 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002543
2544
2545 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002546 if not self.job.is_finished():
2547 return
showard542e8402008-09-19 20:16:18 +00002548
showardc85c21b2008-11-24 22:17:37 +00002549 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002550 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002551 for queue_entry in hosts_queue:
2552 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002553 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002554 queue_entry.status))
2555
2556 summary_text = "\n".join(summary_text)
2557 status_counts = models.Job.objects.get_status_counts(
2558 [self.job.id])[self.job.id]
2559 status = ', '.join('%d %s' % (count, status) for status, count
2560 in status_counts.iteritems())
2561
2562 subject = 'Autotest: Job ID: %s "%s" %s' % (
2563 self.job.id, self.job.name, status)
2564 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2565 self.job.id, self.job.name, status, self._view_job_url(),
2566 summary_text)
showard170873e2009-01-07 00:22:26 +00002567 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002568
2569
showard77182562009-06-10 00:16:05 +00002570 def run_pre_job_tasks(self, assigned_host=None):
2571 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002572 assert assigned_host
2573 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002574 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002575
showardb18134f2009-03-20 20:52:18 +00002576 logging.info("%s/%s/%s scheduled on %s, status=%s",
2577 self.job.name, self.meta_host, self.atomic_group_id,
2578 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002579
showard77182562009-06-10 00:16:05 +00002580 return self._do_run_pre_job_tasks()
2581
2582
2583 def _do_run_pre_job_tasks(self):
2584 # Every host goes thru the Verifying stage (which may or may not
2585 # actually do anything as determined by get_pre_job_tasks).
2586 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2587
2588 # The pre job tasks always end with a SetEntryPendingTask which
2589 # will continue as appropriate through queue_entry.on_pending().
2590 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002591
showard6ae5ea92009-02-25 00:11:51 +00002592
jadmanski0afbb632008-06-06 21:10:57 +00002593 def requeue(self):
2594 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002595 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002596 # verify/cleanup failure sets the execution subdir, so reset it here
2597 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002598 if self.meta_host:
2599 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002600
2601
jadmanski0afbb632008-06-06 21:10:57 +00002602 def handle_host_failure(self):
2603 """\
2604 Called when this queue entry's host has failed verification and
2605 repair.
2606 """
2607 assert not self.meta_host
2608 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002609 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002610
2611
jadmanskif7fa2cc2008-10-01 14:13:23 +00002612 @property
2613 def aborted_by(self):
2614 self._load_abort_info()
2615 return self._aborted_by
2616
2617
2618 @property
2619 def aborted_on(self):
2620 self._load_abort_info()
2621 return self._aborted_on
2622
2623
2624 def _load_abort_info(self):
2625 """ Fetch info about who aborted the job. """
2626 if hasattr(self, "_aborted_by"):
2627 return
2628 rows = _db.execute("""
2629 SELECT users.login, aborted_host_queue_entries.aborted_on
2630 FROM aborted_host_queue_entries
2631 INNER JOIN users
2632 ON users.id = aborted_host_queue_entries.aborted_by_id
2633 WHERE aborted_host_queue_entries.queue_entry_id = %s
2634 """, (self.id,))
2635 if rows:
2636 self._aborted_by, self._aborted_on = rows[0]
2637 else:
2638 self._aborted_by = self._aborted_on = None
2639
2640
showardb2e2c322008-10-14 17:33:55 +00002641 def on_pending(self):
2642 """
2643 Called when an entry in a synchronous job has passed verify. If the
2644 job is ready to run, returns an agent to run the job. Returns None
2645 otherwise.
2646 """
2647 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002648 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002649 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002650
2651
showardd3dc1992009-04-22 21:01:40 +00002652 def abort(self, dispatcher):
2653 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002654
showardd3dc1992009-04-22 21:01:40 +00002655 Status = models.HostQueueEntry.Status
2656 has_running_job_agent = (
2657 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2658 and dispatcher.get_agents_for_entry(self))
2659 if has_running_job_agent:
2660 # do nothing; post-job tasks will finish and then mark this entry
2661 # with status "Aborted" and take care of the host
2662 return
2663
2664 if self.status in (Status.STARTING, Status.PENDING):
2665 self.host.set_status(models.Host.Status.READY)
2666 elif self.status == Status.VERIFYING:
2667 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2668
2669 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002670
2671 def execution_tag(self):
2672 assert self.execution_subdir
2673 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002674
2675
mbligh36768f02008-02-22 18:28:33 +00002676class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002677 _table_name = 'jobs'
2678 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2679 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002680 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002681 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002682
showard77182562009-06-10 00:16:05 +00002683 # This does not need to be a column in the DB. The delays are likely to
2684 # be configured short. If the scheduler is stopped and restarted in
2685 # the middle of a job's delay cycle, the delay cycle will either be
2686 # repeated or skipped depending on the number of Pending machines found
2687 # when the restarted scheduler recovers to track it. Not a problem.
2688 #
2689 # A reference to the DelayedCallTask that will wake up the job should
2690 # no other HQEs change state in time. Its end_time attribute is used
2691 # by our run_with_ready_delay() method to determine if the wait is over.
2692 _delay_ready_task = None
2693
2694 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2695 # all status='Pending' atomic group HQEs incase a delay was running when the
2696 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002697
showarda3c58572009-03-12 20:36:59 +00002698 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002699 assert id or row
showarda3c58572009-03-12 20:36:59 +00002700 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002701
mblighe2586682008-02-29 22:45:46 +00002702
jadmanski0afbb632008-06-06 21:10:57 +00002703 def is_server_job(self):
2704 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002705
2706
showard170873e2009-01-07 00:22:26 +00002707 def tag(self):
2708 return "%s-%s" % (self.id, self.owner)
2709
2710
jadmanski0afbb632008-06-06 21:10:57 +00002711 def get_host_queue_entries(self):
2712 rows = _db.execute("""
2713 SELECT * FROM host_queue_entries
2714 WHERE job_id= %s
2715 """, (self.id,))
2716 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002717
jadmanski0afbb632008-06-06 21:10:57 +00002718 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002719
jadmanski0afbb632008-06-06 21:10:57 +00002720 return entries
mbligh36768f02008-02-22 18:28:33 +00002721
2722
jadmanski0afbb632008-06-06 21:10:57 +00002723 def set_status(self, status, update_queues=False):
2724 self.update_field('status',status)
2725
2726 if update_queues:
2727 for queue_entry in self.get_host_queue_entries():
2728 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002729
2730
showard77182562009-06-10 00:16:05 +00002731 def _atomic_and_has_started(self):
2732 """
2733 @returns True if any of the HostQueueEntries associated with this job
2734 have entered the Status.STARTING state or beyond.
2735 """
2736 atomic_entries = models.HostQueueEntry.objects.filter(
2737 job=self.id, atomic_group__isnull=False)
2738 if atomic_entries.count() <= 0:
2739 return False
2740
2741 non_started_statuses = (models.HostQueueEntry.Status.QUEUED,
2742 models.HostQueueEntry.Status.VERIFYING,
2743 models.HostQueueEntry.Status.PENDING)
2744 started_entries = atomic_entries.exclude(
2745 status__in=non_started_statuses)
2746 return started_entries.count() > 0
2747
2748
2749 def _pending_count(self):
2750 """The number of HostQueueEntries for this job in the Pending state."""
2751 pending_entries = models.HostQueueEntry.objects.filter(
2752 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2753 return pending_entries.count()
2754
2755
jadmanski0afbb632008-06-06 21:10:57 +00002756 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002757 # NOTE: Atomic group jobs stop reporting ready after they have been
2758 # started to avoid launching multiple copies of one atomic job.
2759 # Only possible if synch_count is less than than half the number of
2760 # machines in the atomic group.
2761 return (self._pending_count() >= self.synch_count
2762 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002763
2764
jadmanski0afbb632008-06-06 21:10:57 +00002765 def num_machines(self, clause = None):
2766 sql = "job_id=%s" % self.id
2767 if clause:
2768 sql += " AND (%s)" % clause
2769 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002770
2771
jadmanski0afbb632008-06-06 21:10:57 +00002772 def num_queued(self):
2773 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002774
2775
jadmanski0afbb632008-06-06 21:10:57 +00002776 def num_active(self):
2777 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002778
2779
jadmanski0afbb632008-06-06 21:10:57 +00002780 def num_complete(self):
2781 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002782
2783
jadmanski0afbb632008-06-06 21:10:57 +00002784 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002785 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002786
mbligh36768f02008-02-22 18:28:33 +00002787
showard6bb7c292009-01-30 01:44:51 +00002788 def _not_yet_run_entries(self, include_verifying=True):
2789 statuses = [models.HostQueueEntry.Status.QUEUED,
2790 models.HostQueueEntry.Status.PENDING]
2791 if include_verifying:
2792 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2793 return models.HostQueueEntry.objects.filter(job=self.id,
2794 status__in=statuses)
2795
2796
2797 def _stop_all_entries(self):
2798 entries_to_stop = self._not_yet_run_entries(
2799 include_verifying=False)
2800 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002801 assert not child_entry.complete, (
2802 '%s status=%s, active=%s, complete=%s' %
2803 (child_entry.id, child_entry.status, child_entry.active,
2804 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002805 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2806 child_entry.host.status = models.Host.Status.READY
2807 child_entry.host.save()
2808 child_entry.status = models.HostQueueEntry.Status.STOPPED
2809 child_entry.save()
2810
showard2bab8f42008-11-12 18:15:22 +00002811 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002812 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002813 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002814 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002815
2816
jadmanski0afbb632008-06-06 21:10:57 +00002817 def write_to_machines_file(self, queue_entry):
2818 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002819 file_path = os.path.join(self.tag(), '.machines')
2820 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002821
2822
showardf1ae3542009-05-11 19:26:02 +00002823 def _next_group_name(self, group_name=''):
2824 """@returns a directory name to use for the next host group results."""
2825 if group_name:
2826 # Sanitize for use as a pathname.
2827 group_name = group_name.replace(os.path.sep, '_')
2828 if group_name.startswith('.'):
2829 group_name = '_' + group_name[1:]
2830 # Add a separator between the group name and 'group%d'.
2831 group_name += '.'
2832 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002833 query = models.HostQueueEntry.objects.filter(
2834 job=self.id).values('execution_subdir').distinct()
2835 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002836 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2837 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002838 if ids:
2839 next_id = max(ids) + 1
2840 else:
2841 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002842 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002843
2844
showard170873e2009-01-07 00:22:26 +00002845 def _write_control_file(self, execution_tag):
2846 control_path = _drone_manager.attach_file_to_execution(
2847 execution_tag, self.control_file)
2848 return control_path
mbligh36768f02008-02-22 18:28:33 +00002849
showardb2e2c322008-10-14 17:33:55 +00002850
showard2bab8f42008-11-12 18:15:22 +00002851 def get_group_entries(self, queue_entry_from_group):
2852 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002853 return list(HostQueueEntry.fetch(
2854 where='job_id=%s AND execution_subdir=%s',
2855 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002856
2857
showardb2e2c322008-10-14 17:33:55 +00002858 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002859 assert queue_entries
2860 execution_tag = queue_entries[0].execution_tag()
2861 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002862 hostnames = ','.join([entry.get_host().hostname
2863 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002864
showard87ba02a2009-04-20 19:37:32 +00002865 params = _autoserv_command_line(
2866 hostnames, execution_tag,
2867 ['-P', execution_tag, '-n',
2868 _drone_manager.absolute_path(control_path)],
2869 job=self)
mbligh36768f02008-02-22 18:28:33 +00002870
jadmanski0afbb632008-06-06 21:10:57 +00002871 if not self.is_server_job():
2872 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002873
showardb2e2c322008-10-14 17:33:55 +00002874 return params
mblighe2586682008-02-29 22:45:46 +00002875
mbligh36768f02008-02-22 18:28:33 +00002876
showardc9ae1782009-01-30 01:42:37 +00002877 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002878 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002879 return True
showard0fc38302008-10-23 00:44:07 +00002880 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002881 return queue_entry.get_host().dirty
2882 return False
showard21baa452008-10-21 00:08:39 +00002883
showardc9ae1782009-01-30 01:42:37 +00002884
2885 def _should_run_verify(self, queue_entry):
2886 do_not_verify = (queue_entry.host.protection ==
2887 host_protections.Protection.DO_NOT_VERIFY)
2888 if do_not_verify:
2889 return False
2890 return self.run_verify
2891
2892
showard77182562009-06-10 00:16:05 +00002893 def get_pre_job_tasks(self, queue_entry):
2894 """
2895 Get a list of tasks to perform before the host_queue_entry
2896 may be used to run this Job (such as Cleanup & Verify).
2897
2898 @returns A list of tasks to be done to the given queue_entry before
2899 it should be considered be ready to run this job. The last
2900 task in the list calls HostQueueEntry.on_pending(), which
2901 continues the flow of the job.
2902 """
showard21baa452008-10-21 00:08:39 +00002903 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002904 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002905 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002906 if self._should_run_verify(queue_entry):
2907 tasks.append(VerifyTask(queue_entry=queue_entry))
2908 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002909 return tasks
2910
2911
showardf1ae3542009-05-11 19:26:02 +00002912 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002913 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002914 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002915 else:
showardf1ae3542009-05-11 19:26:02 +00002916 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002917 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002918 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002919 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002920
2921 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002922 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002923
2924
2925 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002926 """
2927 @returns A tuple containing a list of HostQueueEntry instances to be
2928 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002929 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002930 """
showard77182562009-06-10 00:16:05 +00002931 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002932 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002933 if atomic_group:
2934 num_entries_wanted = atomic_group.max_number_of_machines
2935 else:
2936 num_entries_wanted = self.synch_count
2937 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002938
showardf1ae3542009-05-11 19:26:02 +00002939 if num_entries_wanted > 0:
2940 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002941 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002942 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002943 params=(self.id, include_queue_entry.id)))
2944
2945 # Sort the chosen hosts by hostname before slicing.
2946 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2947 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2948 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2949 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002950
showardf1ae3542009-05-11 19:26:02 +00002951 # Sanity check. We'll only ever be called if this can be met.
2952 assert len(chosen_entries) >= self.synch_count
2953
2954 if atomic_group:
2955 # Look at any meta_host and dependency labels and pick the first
2956 # one that also specifies this atomic group. Use that label name
2957 # as the group name if possible (it is more specific).
2958 group_name = atomic_group.name
2959 for label in include_queue_entry.get_labels():
2960 if label.atomic_group_id:
2961 assert label.atomic_group_id == atomic_group.id
2962 group_name = label.name
2963 break
2964 else:
2965 group_name = ''
2966
2967 self._assign_new_group(chosen_entries, group_name=group_name)
2968 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002969
2970
showard77182562009-06-10 00:16:05 +00002971 def run_if_ready(self, queue_entry):
2972 """
2973 @returns An Agent instance to ultimately run this job if enough hosts
2974 are ready for it to run.
2975 @returns None and potentially cleans up excess hosts if this Job
2976 is not ready to run.
2977 """
showardb2e2c322008-10-14 17:33:55 +00002978 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00002979 self.stop_if_necessary()
2980 return None
mbligh36768f02008-02-22 18:28:33 +00002981
showard77182562009-06-10 00:16:05 +00002982 if queue_entry.atomic_group:
2983 return self.run_with_ready_delay(queue_entry)
2984
2985 return self.run(queue_entry)
2986
2987
2988 def run_with_ready_delay(self, queue_entry):
2989 """
2990 Start a delay to wait for more hosts to enter Pending state before
2991 launching an atomic group job. Once set, the a delay cannot be reset.
2992
2993 @param queue_entry: The HostQueueEntry object to get atomic group
2994 info from and pass to run_if_ready when the delay is up.
2995
2996 @returns An Agent to run the job as appropriate or None if a delay
2997 has already been set.
2998 """
2999 assert queue_entry.job_id == self.id
3000 assert queue_entry.atomic_group
3001 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3002 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3003 over_max_threshold = (self._pending_count() >= pending_threshold)
3004 delay_expired = (self._delay_ready_task and
3005 time.time() >= self._delay_ready_task.end_time)
3006
3007 # Delay is disabled or we already have enough? Do not wait to run.
3008 if not delay or over_max_threshold or delay_expired:
3009 return self.run(queue_entry)
3010
3011 # A delay was previously scheduled.
3012 if self._delay_ready_task:
3013 return None
3014
3015 def run_job_after_delay():
3016 logging.info('Job %s done waiting for extra hosts.', self.id)
3017 return self.run(queue_entry)
3018
3019 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3020 callback=run_job_after_delay)
3021
3022 return Agent([self._delay_ready_task], num_processes=0)
3023
3024
3025 def run(self, queue_entry):
3026 """
3027 @param queue_entry: The HostQueueEntry instance calling this method.
3028 @returns An Agent instance to run this job or None if we've already
3029 been run.
3030 """
3031 if queue_entry.atomic_group and self._atomic_and_has_started():
3032 logging.error('Job.run() called on running atomic Job %d '
3033 'with HQE %s.', self.id, queue_entry)
3034 return None
showardf1ae3542009-05-11 19:26:02 +00003035 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3036 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003037
3038
showardf1ae3542009-05-11 19:26:02 +00003039 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003040 for queue_entry in queue_entries:
3041 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003042 params = self._get_autoserv_params(queue_entries)
3043 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003044 cmd=params, group_name=group_name)
3045 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003046 if self._delay_ready_task:
3047 # Cancel any pending callback that would try to run again
3048 # as we are already running.
3049 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003050
showard170873e2009-01-07 00:22:26 +00003051 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003052
3053
mbligh36768f02008-02-22 18:28:33 +00003054if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003055 main()