blob: 85c37eec450f8f4bb81fca1d56a1becbca5ed0b3 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000693 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000694 self._reverify_remaining_hosts()
695 # reinitialize drones after killing orphaned processes, since they can
696 # leave around files when they die
697 _drone_manager.execute_actions()
698 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000699
showard170873e2009-01-07 00:22:26 +0000700
701 def _register_pidfiles(self):
702 # during recovery we may need to read pidfiles for both running and
703 # parsing entries
704 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000705 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000706 special_tasks = models.SpecialTask.objects.filter(is_active=True)
707 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000708 for pidfile_name in _ALL_PIDFILE_NAMES:
709 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000710 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000711 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000712
713
showarded2afea2009-07-07 20:54:07 +0000714 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
715 run_monitor = PidfileRunMonitor()
716 run_monitor.attach_to_existing_process(execution_path,
717 pidfile_name=pidfile_name)
718 if run_monitor.has_process():
719 orphans.discard(run_monitor.get_process())
720 return run_monitor, '(process %s)' % run_monitor.get_process()
721 return None, 'without process'
722
723
showardd3dc1992009-04-22 21:01:40 +0000724 def _recover_entries_with_status(self, status, orphans, pidfile_name,
725 recover_entries_fn):
726 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000727 for queue_entry in queue_entries:
728 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000729 # synchronous job we've already recovered
730 continue
showardd3dc1992009-04-22 21:01:40 +0000731 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000732 run_monitor, process_string = self._get_recovery_run_monitor(
733 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000734
showarded2afea2009-07-07 20:54:07 +0000735 logging.info('Recovering %s entry %s %s',status.lower(),
736 ', '.join(str(entry) for entry in queue_entries),
737 process_string)
showardd3dc1992009-04-22 21:01:40 +0000738 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000739
740
741 def _kill_remaining_orphan_processes(self, orphans):
742 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000743 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000744 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000745
showard170873e2009-01-07 00:22:26 +0000746
showardd3dc1992009-04-22 21:01:40 +0000747 def _recover_running_entries(self, orphans):
748 def recover_entries(job, queue_entries, run_monitor):
749 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000750 queue_task = QueueTask(job=job, queue_entries=queue_entries,
751 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000752 self.add_agent(Agent(tasks=[queue_task],
753 num_processes=len(queue_entries)))
754 # else, _requeue_other_active_entries will cover this
755
756 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000757 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000758 recover_entries)
759
760
761 def _recover_gathering_entries(self, orphans):
762 def recover_entries(job, queue_entries, run_monitor):
763 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000764 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000765 self.add_agent(Agent([gather_task]))
766
767 self._recover_entries_with_status(
768 models.HostQueueEntry.Status.GATHERING,
769 orphans, _CRASHINFO_PID_FILE, recover_entries)
770
771
772 def _recover_parsing_entries(self, orphans):
773 def recover_entries(job, queue_entries, run_monitor):
774 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000775 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000776 self.add_agent(Agent([reparse_task], num_processes=0))
777
778 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
779 orphans, _PARSER_PID_FILE,
780 recover_entries)
781
782
783 def _recover_all_recoverable_entries(self):
784 orphans = _drone_manager.get_orphaned_autoserv_processes()
785 self._recover_running_entries(orphans)
786 self._recover_gathering_entries(orphans)
787 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000788 self._recover_special_tasks(orphans)
showardd3dc1992009-04-22 21:01:40 +0000789 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000790
showard97aed502008-11-04 02:01:24 +0000791
showarded2afea2009-07-07 20:54:07 +0000792 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000793 """\
794 Recovers all special tasks that have started running but have not
795 completed.
796 """
797
798 tasks = models.SpecialTask.objects.filter(is_active=True,
799 is_complete=False)
800 # Use ordering to force NULL queue_entry_id's to the end of the list
801 for task in tasks.order_by('-queue_entry_id'):
showarded2afea2009-07-07 20:54:07 +0000802 assert not self.host_has_agent(task.host)
showard2fe3f1d2009-07-06 20:19:11 +0000803
804 host = Host(id=task.host.id)
805 queue_entry = None
806 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000807 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000808
showarded2afea2009-07-07 20:54:07 +0000809 run_monitor, process_string = self._get_recovery_run_monitor(
810 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
811
812 logging.info('Recovering %s %s', task, process_string)
813 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000814
815
showarded2afea2009-07-07 20:54:07 +0000816 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000817 """\
818 Recovers a single special task.
819 """
820 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000821 agent_tasks = self._recover_verify(task, host, queue_entry,
822 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000823 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000824 agent_tasks = self._recover_repair(task, host, queue_entry,
825 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000826 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000827 agent_tasks = self._recover_cleanup(task, host, queue_entry,
828 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000829 else:
830 # Should never happen
831 logging.error(
832 "Special task id %d had invalid task %s", (task.id, task.task))
833
834 self.add_agent(Agent(agent_tasks))
835
836
showarded2afea2009-07-07 20:54:07 +0000837 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000838 """\
839 Recovers a verify task.
840 No associated queue entry: Verify host
841 With associated queue entry: Verify host, and run associated queue
842 entry
843 """
844 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000845 return [VerifyTask(host=host, task=task,
846 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000847 else:
showarded2afea2009-07-07 20:54:07 +0000848 return [VerifyTask(queue_entry=queue_entry, task=task,
849 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000850 SetEntryPendingTask(queue_entry=queue_entry)]
851
852
showarded2afea2009-07-07 20:54:07 +0000853 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000854 """\
855 Recovers a repair task.
856 Always repair host
857 """
showarded2afea2009-07-07 20:54:07 +0000858 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
859 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000860
861
showarded2afea2009-07-07 20:54:07 +0000862 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000863 """\
864 Recovers a cleanup task.
865 No associated queue entry: Clean host
866 With associated queue entry: Clean host, verify host if needed, and
867 run associated queue entry
868 """
869 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000870 return [CleanupTask(host=host, task=task,
871 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000872 else:
873 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000874 task=task,
875 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000876 if queue_entry.job.should_run_verify(queue_entry):
877 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
878 agent_tasks.append(
879 SetEntryPendingTask(queue_entry=queue_entry))
880 return agent_tasks
881
882
showard170873e2009-01-07 00:22:26 +0000883 def _requeue_other_active_entries(self):
884 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000885 where='active AND NOT complete AND '
886 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000887
showard2fe3f1d2009-07-06 20:19:11 +0000888 message = '\n'.join(str(entry) for entry in queue_entries
889 if not self.get_agents_for_entry(entry))
890 if message:
891 email_manager.manager.enqueue_notify_email(
892 'Unrecovered active host queue entries exist',
893 message)
showard170873e2009-01-07 00:22:26 +0000894
895
showard1ff7b2e2009-05-15 23:17:18 +0000896 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000897 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000898 task=models.SpecialTask.Task.VERIFY, is_active=False,
899 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000900
showard2fe3f1d2009-07-06 20:19:11 +0000901 for task in tasks:
902 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
903 if host.locked or host.invalid or self.host_has_agent(host):
904 continue
showard6d7b2ff2009-06-10 00:16:47 +0000905
showard2fe3f1d2009-07-06 20:19:11 +0000906 logging.info('Force reverifying host %s', host.hostname)
907 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000908
909
showard170873e2009-01-07 00:22:26 +0000910 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000911 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000912 # should never happen
showarded2afea2009-07-07 20:54:07 +0000913 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000914 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000915 self._reverify_hosts_where(
916 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
917 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000918
919
jadmanski0afbb632008-06-06 21:10:57 +0000920 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000921 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000922 full_where='locked = 0 AND invalid = 0 AND ' + where
923 for host in Host.fetch(where=full_where):
924 if self.host_has_agent(host):
925 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000926 continue
showard170873e2009-01-07 00:22:26 +0000927 if print_message:
showardb18134f2009-03-20 20:52:18 +0000928 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000929 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000930 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000931
932
jadmanski0afbb632008-06-06 21:10:57 +0000933 def _recover_hosts(self):
934 # recover "Repair Failed" hosts
935 message = 'Reverifying dead host %s'
936 self._reverify_hosts_where("status = 'Repair Failed'",
937 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000938
939
showard04c82c52008-05-29 19:38:12 +0000940
showardb95b1bd2008-08-15 18:11:04 +0000941 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000942 # prioritize by job priority, then non-metahost over metahost, then FIFO
943 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000944 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000945 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000946 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000947
948
showard89f84db2009-03-12 20:39:13 +0000949 def _refresh_pending_queue_entries(self):
950 """
951 Lookup the pending HostQueueEntries and call our HostScheduler
952 refresh() method given that list. Return the list.
953
954 @returns A list of pending HostQueueEntries sorted in priority order.
955 """
showard63a34772008-08-18 19:32:50 +0000956 queue_entries = self._get_pending_queue_entries()
957 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000958 return []
showardb95b1bd2008-08-15 18:11:04 +0000959
showard63a34772008-08-18 19:32:50 +0000960 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000961
showard89f84db2009-03-12 20:39:13 +0000962 return queue_entries
963
964
965 def _schedule_atomic_group(self, queue_entry):
966 """
967 Schedule the given queue_entry on an atomic group of hosts.
968
969 Returns immediately if there are insufficient available hosts.
970
971 Creates new HostQueueEntries based off of queue_entry for the
972 scheduled hosts and starts them all running.
973 """
974 # This is a virtual host queue entry representing an entire
975 # atomic group, find a group and schedule their hosts.
976 group_hosts = self._host_scheduler.find_eligible_atomic_group(
977 queue_entry)
978 if not group_hosts:
979 return
showardcbe6f942009-06-17 19:33:49 +0000980
981 logging.info('Expanding atomic group entry %s with hosts %s',
982 queue_entry,
983 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +0000984 # The first assigned host uses the original HostQueueEntry
985 group_queue_entries = [queue_entry]
986 for assigned_host in group_hosts[1:]:
987 # Create a new HQE for every additional assigned_host.
988 new_hqe = HostQueueEntry.clone(queue_entry)
989 new_hqe.save()
990 group_queue_entries.append(new_hqe)
991 assert len(group_queue_entries) == len(group_hosts)
992 for queue_entry, host in itertools.izip(group_queue_entries,
993 group_hosts):
994 self._run_queue_entry(queue_entry, host)
995
996
997 def _schedule_new_jobs(self):
998 queue_entries = self._refresh_pending_queue_entries()
999 if not queue_entries:
1000 return
1001
showard63a34772008-08-18 19:32:50 +00001002 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001003 if (queue_entry.atomic_group_id is None or
1004 queue_entry.host_id is not None):
1005 assigned_host = self._host_scheduler.find_eligible_host(
1006 queue_entry)
1007 if assigned_host:
1008 self._run_queue_entry(queue_entry, assigned_host)
1009 else:
1010 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001011
1012
1013 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001014 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1015 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001016
1017
jadmanski0afbb632008-06-06 21:10:57 +00001018 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001019 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
1020 for agent in self.get_agents_for_entry(entry):
1021 agent.abort()
1022 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001023
1024
showard324bf812009-01-20 23:23:38 +00001025 def _can_start_agent(self, agent, num_started_this_cycle,
1026 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001027 # always allow zero-process agents to run
1028 if agent.num_processes == 0:
1029 return True
1030 # don't allow any nonzero-process agents to run after we've reached a
1031 # limit (this avoids starvation of many-process agents)
1032 if have_reached_limit:
1033 return False
1034 # total process throttling
showard324bf812009-01-20 23:23:38 +00001035 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001036 return False
1037 # if a single agent exceeds the per-cycle throttling, still allow it to
1038 # run when it's the first agent in the cycle
1039 if num_started_this_cycle == 0:
1040 return True
1041 # per-cycle throttling
1042 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001043 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001044 return False
1045 return True
1046
1047
jadmanski0afbb632008-06-06 21:10:57 +00001048 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001049 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001050 have_reached_limit = False
1051 # iterate over copy, so we can remove agents during iteration
1052 for agent in list(self._agents):
1053 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001054 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001055 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001056 continue
1057 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001058 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001059 have_reached_limit):
1060 have_reached_limit = True
1061 continue
showard4c5374f2008-09-04 17:02:56 +00001062 num_started_this_cycle += agent.num_processes
1063 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001064 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001065 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001066
1067
showard29f7cd22009-04-29 21:16:24 +00001068 def _process_recurring_runs(self):
1069 recurring_runs = models.RecurringRun.objects.filter(
1070 start_date__lte=datetime.datetime.now())
1071 for rrun in recurring_runs:
1072 # Create job from template
1073 job = rrun.job
1074 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001075 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001076
1077 host_objects = info['hosts']
1078 one_time_hosts = info['one_time_hosts']
1079 metahost_objects = info['meta_hosts']
1080 dependencies = info['dependencies']
1081 atomic_group = info['atomic_group']
1082
1083 for host in one_time_hosts or []:
1084 this_host = models.Host.create_one_time_host(host.hostname)
1085 host_objects.append(this_host)
1086
1087 try:
1088 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001089 options=options,
showard29f7cd22009-04-29 21:16:24 +00001090 host_objects=host_objects,
1091 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001092 atomic_group=atomic_group)
1093
1094 except Exception, ex:
1095 logging.exception(ex)
1096 #TODO send email
1097
1098 if rrun.loop_count == 1:
1099 rrun.delete()
1100 else:
1101 if rrun.loop_count != 0: # if not infinite loop
1102 # calculate new start_date
1103 difference = datetime.timedelta(seconds=rrun.loop_period)
1104 rrun.start_date = rrun.start_date + difference
1105 rrun.loop_count -= 1
1106 rrun.save()
1107
1108
showard170873e2009-01-07 00:22:26 +00001109class PidfileRunMonitor(object):
1110 """
1111 Client must call either run() to start a new process or
1112 attach_to_existing_process().
1113 """
mbligh36768f02008-02-22 18:28:33 +00001114
showard170873e2009-01-07 00:22:26 +00001115 class _PidfileException(Exception):
1116 """
1117 Raised when there's some unexpected behavior with the pid file, but only
1118 used internally (never allowed to escape this class).
1119 """
mbligh36768f02008-02-22 18:28:33 +00001120
1121
showard170873e2009-01-07 00:22:26 +00001122 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001123 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001124 self._start_time = None
1125 self.pidfile_id = None
1126 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001127
1128
showard170873e2009-01-07 00:22:26 +00001129 def _add_nice_command(self, command, nice_level):
1130 if not nice_level:
1131 return command
1132 return ['nice', '-n', str(nice_level)] + command
1133
1134
1135 def _set_start_time(self):
1136 self._start_time = time.time()
1137
1138
1139 def run(self, command, working_directory, nice_level=None, log_file=None,
1140 pidfile_name=None, paired_with_pidfile=None):
1141 assert command is not None
1142 if nice_level is not None:
1143 command = ['nice', '-n', str(nice_level)] + command
1144 self._set_start_time()
1145 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001146 command, working_directory, pidfile_name=pidfile_name,
1147 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001148
1149
showarded2afea2009-07-07 20:54:07 +00001150 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001151 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001152 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001153 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001154 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001155 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def kill(self):
showard170873e2009-01-07 00:22:26 +00001159 if self.has_process():
1160 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001161
mbligh36768f02008-02-22 18:28:33 +00001162
showard170873e2009-01-07 00:22:26 +00001163 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001164 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001165 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001166
1167
showard170873e2009-01-07 00:22:26 +00001168 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001169 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001170 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001171 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001172
1173
showard170873e2009-01-07 00:22:26 +00001174 def _read_pidfile(self, use_second_read=False):
1175 assert self.pidfile_id is not None, (
1176 'You must call run() or attach_to_existing_process()')
1177 contents = _drone_manager.get_pidfile_contents(
1178 self.pidfile_id, use_second_read=use_second_read)
1179 if contents.is_invalid():
1180 self._state = drone_manager.PidfileContents()
1181 raise self._PidfileException(contents)
1182 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001183
1184
showard21baa452008-10-21 00:08:39 +00001185 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001186 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1187 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001188 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001189 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001190
1191
1192 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001193 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001194 return
mblighbb421852008-03-11 22:36:16 +00001195
showard21baa452008-10-21 00:08:39 +00001196 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001197
showard170873e2009-01-07 00:22:26 +00001198 if self._state.process is None:
1199 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001200 return
mbligh90a549d2008-03-25 23:52:34 +00001201
showard21baa452008-10-21 00:08:39 +00001202 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001203 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001204 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001205 return
mbligh90a549d2008-03-25 23:52:34 +00001206
showard170873e2009-01-07 00:22:26 +00001207 # pid but no running process - maybe process *just* exited
1208 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001209 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001210 # autoserv exited without writing an exit code
1211 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001212 self._handle_pidfile_error(
1213 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001214
showard21baa452008-10-21 00:08:39 +00001215
1216 def _get_pidfile_info(self):
1217 """\
1218 After completion, self._state will contain:
1219 pid=None, exit_status=None if autoserv has not yet run
1220 pid!=None, exit_status=None if autoserv is running
1221 pid!=None, exit_status!=None if autoserv has completed
1222 """
1223 try:
1224 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001225 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001226 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001227
1228
showard170873e2009-01-07 00:22:26 +00001229 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001230 """\
1231 Called when no pidfile is found or no pid is in the pidfile.
1232 """
showard170873e2009-01-07 00:22:26 +00001233 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001234 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1235 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001236 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001237 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001238
1239
showard35162b02009-03-03 02:17:30 +00001240 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001241 """\
1242 Called when autoserv has exited without writing an exit status,
1243 or we've timed out waiting for autoserv to write a pid to the
1244 pidfile. In either case, we just return failure and the caller
1245 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001246
showard170873e2009-01-07 00:22:26 +00001247 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001248 """
1249 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001250 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001251 self._state.exit_status = 1
1252 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001253
1254
jadmanski0afbb632008-06-06 21:10:57 +00001255 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001256 self._get_pidfile_info()
1257 return self._state.exit_status
1258
1259
1260 def num_tests_failed(self):
1261 self._get_pidfile_info()
1262 assert self._state.num_tests_failed is not None
1263 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001264
1265
mbligh36768f02008-02-22 18:28:33 +00001266class Agent(object):
showard77182562009-06-10 00:16:05 +00001267 """
1268 An agent for use by the Dispatcher class to perform a sequence of tasks.
1269
1270 The following methods are required on all task objects:
1271 poll() - Called periodically to let the task check its status and
1272 update its internal state. If the task succeeded.
1273 is_done() - Returns True if the task is finished.
1274 abort() - Called when an abort has been requested. The task must
1275 set its aborted attribute to True if it actually aborted.
1276
1277 The following attributes are required on all task objects:
1278 aborted - bool, True if this task was aborted.
1279 failure_tasks - A sequence of tasks to be run using a new Agent
1280 by the dispatcher should this task fail.
1281 success - bool, True if this task succeeded.
1282 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1283 host_ids - A sequence of Host ids this task represents.
1284
1285 The following attribute is written to all task objects:
1286 agent - A reference to the Agent instance that the task has been
1287 added to.
1288 """
1289
1290
showard170873e2009-01-07 00:22:26 +00001291 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001292 """
1293 @param tasks: A list of tasks as described in the class docstring.
1294 @param num_processes: The number of subprocesses the Agent represents.
1295 This is used by the Dispatcher for managing the load on the
1296 system. Defaults to 1.
1297 """
jadmanski0afbb632008-06-06 21:10:57 +00001298 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001299 self.queue = None
showard77182562009-06-10 00:16:05 +00001300 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001301 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001302 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001303
showard170873e2009-01-07 00:22:26 +00001304 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1305 for task in tasks)
1306 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1307
showardd3dc1992009-04-22 21:01:40 +00001308 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001309 for task in tasks:
1310 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001311
1312
showardd3dc1992009-04-22 21:01:40 +00001313 def _clear_queue(self):
1314 self.queue = Queue.Queue(0)
1315
1316
showard170873e2009-01-07 00:22:26 +00001317 def _union_ids(self, id_lists):
1318 return set(itertools.chain(*id_lists))
1319
1320
jadmanski0afbb632008-06-06 21:10:57 +00001321 def add_task(self, task):
1322 self.queue.put_nowait(task)
1323 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001324
1325
jadmanski0afbb632008-06-06 21:10:57 +00001326 def tick(self):
showard21baa452008-10-21 00:08:39 +00001327 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001328 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001329 self.active_task.poll()
1330 if not self.active_task.is_done():
1331 return
1332 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001333
1334
jadmanski0afbb632008-06-06 21:10:57 +00001335 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001336 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001337 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001338 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001339 if not self.active_task.success:
1340 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001341 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001342
jadmanski0afbb632008-06-06 21:10:57 +00001343 if not self.is_done():
1344 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001345
1346
jadmanski0afbb632008-06-06 21:10:57 +00001347 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001348 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001349 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1350 # get reset.
1351 new_agent = Agent(self.active_task.failure_tasks)
1352 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001353
mblighe2586682008-02-29 22:45:46 +00001354
showard4c5374f2008-09-04 17:02:56 +00001355 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001356 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001357
1358
jadmanski0afbb632008-06-06 21:10:57 +00001359 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001360 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001361
1362
showardd3dc1992009-04-22 21:01:40 +00001363 def abort(self):
showard08a36412009-05-05 01:01:13 +00001364 # abort tasks until the queue is empty or a task ignores the abort
1365 while not self.is_done():
1366 if not self.active_task:
1367 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001368 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001369 if not self.active_task.aborted:
1370 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001371 return
1372 self.active_task = None
1373
showardd3dc1992009-04-22 21:01:40 +00001374
showard77182562009-06-10 00:16:05 +00001375class DelayedCallTask(object):
1376 """
1377 A task object like AgentTask for an Agent to run that waits for the
1378 specified amount of time to have elapsed before calling the supplied
1379 callback once and finishing. If the callback returns anything, it is
1380 assumed to be a new Agent instance and will be added to the dispatcher.
1381
1382 @attribute end_time: The absolute posix time after which this task will
1383 call its callback when it is polled and be finished.
1384
1385 Also has all attributes required by the Agent class.
1386 """
1387 def __init__(self, delay_seconds, callback, now_func=None):
1388 """
1389 @param delay_seconds: The delay in seconds from now that this task
1390 will call the supplied callback and be done.
1391 @param callback: A callable to be called by this task once after at
1392 least delay_seconds time has elapsed. It must return None
1393 or a new Agent instance.
1394 @param now_func: A time.time like function. Default: time.time.
1395 Used for testing.
1396 """
1397 assert delay_seconds > 0
1398 assert callable(callback)
1399 if not now_func:
1400 now_func = time.time
1401 self._now_func = now_func
1402 self._callback = callback
1403
1404 self.end_time = self._now_func() + delay_seconds
1405
1406 # These attributes are required by Agent.
1407 self.aborted = False
1408 self.failure_tasks = ()
1409 self.host_ids = ()
1410 self.success = False
1411 self.queue_entry_ids = ()
1412 # This is filled in by Agent.add_task().
1413 self.agent = None
1414
1415
1416 def poll(self):
1417 if self._callback and self._now_func() >= self.end_time:
1418 new_agent = self._callback()
1419 if new_agent:
1420 self.agent.dispatcher.add_agent(new_agent)
1421 self._callback = None
1422 self.success = True
1423
1424
1425 def is_done(self):
1426 return not self._callback
1427
1428
1429 def abort(self):
1430 self.aborted = True
1431 self._callback = None
1432
1433
mbligh36768f02008-02-22 18:28:33 +00001434class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001435 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1436 pidfile_name=None, paired_with_pidfile=None,
1437 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001438 self.done = False
1439 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001440 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001441 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001442 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001443 self.monitor = recover_run_monitor
1444 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001445 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001446 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001447 self.queue_entry_ids = []
1448 self.host_ids = []
1449 self.log_file = None
1450
1451
1452 def _set_ids(self, host=None, queue_entries=None):
1453 if queue_entries and queue_entries != [None]:
1454 self.host_ids = [entry.host.id for entry in queue_entries]
1455 self.queue_entry_ids = [entry.id for entry in queue_entries]
1456 else:
1457 assert host
1458 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001459
1460
jadmanski0afbb632008-06-06 21:10:57 +00001461 def poll(self):
showard08a36412009-05-05 01:01:13 +00001462 if not self.started:
1463 self.start()
1464 self.tick()
1465
1466
1467 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001468 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001469 exit_code = self.monitor.exit_code()
1470 if exit_code is None:
1471 return
1472 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001473 else:
1474 success = False
mbligh36768f02008-02-22 18:28:33 +00001475
jadmanski0afbb632008-06-06 21:10:57 +00001476 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001477
1478
jadmanski0afbb632008-06-06 21:10:57 +00001479 def is_done(self):
1480 return self.done
mbligh36768f02008-02-22 18:28:33 +00001481
1482
jadmanski0afbb632008-06-06 21:10:57 +00001483 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001484 if self.done:
1485 return
jadmanski0afbb632008-06-06 21:10:57 +00001486 self.done = True
1487 self.success = success
1488 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001489
1490
jadmanski0afbb632008-06-06 21:10:57 +00001491 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001492 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001493
mbligh36768f02008-02-22 18:28:33 +00001494
jadmanski0afbb632008-06-06 21:10:57 +00001495 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001496 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001497 _drone_manager.copy_to_results_repository(
1498 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001499
1500
jadmanski0afbb632008-06-06 21:10:57 +00001501 def epilog(self):
1502 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001503
1504
jadmanski0afbb632008-06-06 21:10:57 +00001505 def start(self):
1506 assert self.agent
1507
1508 if not self.started:
1509 self.prolog()
1510 self.run()
1511
1512 self.started = True
1513
1514
1515 def abort(self):
1516 if self.monitor:
1517 self.monitor.kill()
1518 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001519 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001520 self.cleanup()
1521
1522
showarded2afea2009-07-07 20:54:07 +00001523 def _get_consistent_execution_path(self, execution_entries):
1524 first_execution_path = execution_entries[0].execution_path()
1525 for execution_entry in execution_entries[1:]:
1526 assert execution_entry.execution_path() == first_execution_path, (
1527 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1528 execution_entry,
1529 first_execution_path,
1530 execution_entries[0]))
1531 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001532
1533
showarded2afea2009-07-07 20:54:07 +00001534 def _copy_results(self, execution_entries, use_monitor=None):
1535 """
1536 @param execution_entries: list of objects with execution_path() method
1537 """
1538 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001539 if use_monitor is None:
1540 assert self.monitor
1541 use_monitor = self.monitor
1542 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001543 execution_path = self._get_consistent_execution_path(execution_entries)
1544 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001545 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001546 results_path)
showardde634ee2009-01-30 01:44:24 +00001547
showarda1e74b32009-05-12 17:32:04 +00001548
1549 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001550 reparse_task = FinalReparseTask(queue_entries)
1551 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1552
1553
showarda1e74b32009-05-12 17:32:04 +00001554 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1555 self._copy_results(queue_entries, use_monitor)
1556 self._parse_results(queue_entries)
1557
1558
showardd3dc1992009-04-22 21:01:40 +00001559 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001560 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001561 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001562 self.monitor = PidfileRunMonitor()
1563 self.monitor.run(self.cmd, self._working_directory,
1564 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001565 log_file=self.log_file,
1566 pidfile_name=pidfile_name,
1567 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001568
1569
showardd9205182009-04-27 20:09:55 +00001570class TaskWithJobKeyvals(object):
1571 """AgentTask mixin providing functionality to help with job keyval files."""
1572 _KEYVAL_FILE = 'keyval'
1573 def _format_keyval(self, key, value):
1574 return '%s=%s' % (key, value)
1575
1576
1577 def _keyval_path(self):
1578 """Subclasses must override this"""
1579 raise NotImplemented
1580
1581
1582 def _write_keyval_after_job(self, field, value):
1583 assert self.monitor
1584 if not self.monitor.has_process():
1585 return
1586 _drone_manager.write_lines_to_file(
1587 self._keyval_path(), [self._format_keyval(field, value)],
1588 paired_with_process=self.monitor.get_process())
1589
1590
1591 def _job_queued_keyval(self, job):
1592 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1593
1594
1595 def _write_job_finished(self):
1596 self._write_keyval_after_job("job_finished", int(time.time()))
1597
1598
showarded2afea2009-07-07 20:54:07 +00001599class SpecialAgentTask(AgentTask):
1600 """
1601 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1602 """
1603
1604 TASK_TYPE = None
1605 host = None
1606 queue_entry = None
1607
1608 def __init__(self, task, extra_command_args, **kwargs):
1609 assert self.host
1610 assert (self.TASK_TYPE is not None,
1611 'self.TASK_TYPE must be overridden')
1612 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001613 if task:
1614 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001615 self._extra_command_args = extra_command_args
1616 super(SpecialAgentTask, self).__init__(**kwargs)
1617
1618
1619 def prolog(self):
1620 super(SpecialAgentTask, self).prolog()
1621 self.task = models.SpecialTask.prepare(self, self.task)
1622 self.cmd = _autoserv_command_line(self.host.hostname,
1623 self._extra_command_args,
1624 queue_entry=self.queue_entry)
1625 self._working_directory = self.task.execution_path()
1626 self.task.activate()
1627
1628
showardb6681aa2009-07-08 21:15:00 +00001629 def cleanup(self):
1630 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001631
1632 # self.task can be None if a SpecialAgentTask is aborted before the
1633 # prolog runs
1634 if self.task:
1635 self.task.finish()
1636
1637 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001638 self._copy_results([self.task])
1639
1640
1641class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1642 TASK_TYPE = models.SpecialTask.Task.REPAIR
1643
1644
1645 def __init__(self, host, queue_entry=None, task=None,
1646 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001647 """\
showard170873e2009-01-07 00:22:26 +00001648 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001649 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001650 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001651 # normalize the protection name
1652 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001653
jadmanski0afbb632008-06-06 21:10:57 +00001654 self.host = host
showardcfd4a7e2009-07-11 01:47:33 +00001655 self.queue_entry = None
1656 # recovery code can pass a HQE that's already been requeued. for a
1657 # metahost, that means the host has been unassigned. in that case,
1658 # ignore the HQE.
1659 hqe_still_assigned_to_this_host = (queue_entry and queue_entry.host
1660 and queue_entry.host.id == host.id)
1661 if hqe_still_assigned_to_this_host:
1662 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001663
showarded2afea2009-07-07 20:54:07 +00001664 super(RepairTask, self).__init__(
1665 task, ['-R', '--host-protection', protection],
1666 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001667
showard2fe3f1d2009-07-06 20:19:11 +00001668 # *don't* include the queue entry in IDs -- if the queue entry is
1669 # aborted, we want to leave the repair task running
1670 self._set_ids(host=host)
1671
mbligh36768f02008-02-22 18:28:33 +00001672
jadmanski0afbb632008-06-06 21:10:57 +00001673 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001674 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001675 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001676 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001677 if self.queue_entry:
1678 self.queue_entry.requeue()
1679
mbligh36768f02008-02-22 18:28:33 +00001680
showardd9205182009-04-27 20:09:55 +00001681 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001682 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001683
1684
showardde634ee2009-01-30 01:44:24 +00001685 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001686 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001687
showard2fe3f1d2009-07-06 20:19:11 +00001688 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001689 return # don't fail metahost entries, they'll be reassigned
1690
showard2fe3f1d2009-07-06 20:19:11 +00001691 self.queue_entry.update_from_database()
1692 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001693 return # entry has been aborted
1694
showard2fe3f1d2009-07-06 20:19:11 +00001695 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001696 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001697 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001698 self._write_keyval_after_job(queued_key, queued_time)
1699 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001700 # copy results logs into the normal place for job results
1701 _drone_manager.copy_results_on_drone(
1702 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001703 source_path=self._working_directory + '/',
1704 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001705
showard2fe3f1d2009-07-06 20:19:11 +00001706 self._copy_results([self.queue_entry])
1707 if self.queue_entry.job.parse_failed_repair:
1708 self._parse_results([self.queue_entry])
1709 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001710
1711
jadmanski0afbb632008-06-06 21:10:57 +00001712 def epilog(self):
1713 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001714
jadmanski0afbb632008-06-06 21:10:57 +00001715 if self.success:
1716 self.host.set_status('Ready')
1717 else:
1718 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001719 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001720 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001721
1722
showarded2afea2009-07-07 20:54:07 +00001723class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001724 def epilog(self):
1725 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001726 should_copy_results = (self.queue_entry and not self.success
1727 and not self.queue_entry.meta_host)
1728 if should_copy_results:
1729 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001730 log_name = os.path.basename(self.task.execution_path())
1731 source = os.path.join(self.task.execution_path(), 'debug',
1732 'autoserv.DEBUG')
1733 destination = os.path.join(self.queue_entry.execution_path(),
1734 log_name)
showard170873e2009-01-07 00:22:26 +00001735 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001736 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001737 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001738
1739
1740class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001741 TASK_TYPE = models.SpecialTask.Task.VERIFY
1742
1743
1744 def __init__(self, queue_entry=None, host=None, task=None,
1745 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001746 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001747 self.host = host or queue_entry.host
1748 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001749
showarde788ea62008-11-17 21:02:47 +00001750 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001751 super(VerifyTask, self).__init__(
1752 task, ['-v'], failure_tasks=failure_tasks,
1753 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001754
showard170873e2009-01-07 00:22:26 +00001755 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001756
1757
jadmanski0afbb632008-06-06 21:10:57 +00001758 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001759 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001760
showardb18134f2009-03-20 20:52:18 +00001761 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001762 if self.queue_entry:
1763 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001764 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001765
showarded2afea2009-07-07 20:54:07 +00001766 # Delete any other queued verifies for this host. One verify will do
1767 # and there's no need to keep records of other requests.
1768 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001769 host__id=self.host.id,
1770 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001771 is_active=False, is_complete=False)
1772 queued_verifies = queued_verifies.exclude(id=self.task.id)
1773 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001774
mbligh36768f02008-02-22 18:28:33 +00001775
jadmanski0afbb632008-06-06 21:10:57 +00001776 def epilog(self):
1777 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001778 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001779 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001780
1781
showardb5626452009-06-30 01:57:28 +00001782class CleanupHostsMixin(object):
1783 def _reboot_hosts(self, job, queue_entries, final_success,
1784 num_tests_failed):
1785 reboot_after = job.reboot_after
1786 do_reboot = (
1787 # always reboot after aborted jobs
1788 self._final_status == models.HostQueueEntry.Status.ABORTED
1789 or reboot_after == models.RebootAfter.ALWAYS
1790 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1791 and final_success and num_tests_failed == 0))
1792
1793 for queue_entry in queue_entries:
1794 if do_reboot:
1795 # don't pass the queue entry to the CleanupTask. if the cleanup
1796 # fails, the job doesn't care -- it's over.
1797 cleanup_task = CleanupTask(host=queue_entry.host)
1798 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1799 else:
1800 queue_entry.host.set_status('Ready')
1801
1802
1803class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001804 def __init__(self, job, queue_entries, cmd=None, group_name='',
1805 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001806 self.job = job
1807 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001808 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001809 super(QueueTask, self).__init__(
1810 cmd, self._execution_path(),
1811 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001812 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001813
1814
showard73ec0442009-02-07 02:05:20 +00001815 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001816 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001817
1818
1819 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1820 keyval_contents = '\n'.join(self._format_keyval(key, value)
1821 for key, value in keyval_dict.iteritems())
1822 # always end with a newline to allow additional keyvals to be written
1823 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001824 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001825 keyval_contents,
1826 file_path=keyval_path)
1827
1828
1829 def _write_keyvals_before_job(self, keyval_dict):
1830 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1831
1832
showard170873e2009-01-07 00:22:26 +00001833 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001834 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001835 host.hostname)
1836 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001837 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1838 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001839
1840
showarded2afea2009-07-07 20:54:07 +00001841 def _execution_path(self):
1842 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001843
1844
jadmanski0afbb632008-06-06 21:10:57 +00001845 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001846 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001847 keyval_dict = {queued_key: queued_time}
1848 if self.group_name:
1849 keyval_dict['host_group_name'] = self.group_name
1850 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001851 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001852 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001853 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001854 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001855 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001856 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001857 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001858 assert len(self.queue_entries) == 1
1859 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001860
1861
showard35162b02009-03-03 02:17:30 +00001862 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001863 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001864 _drone_manager.write_lines_to_file(error_file_path,
1865 [_LOST_PROCESS_ERROR])
1866
1867
showardd3dc1992009-04-22 21:01:40 +00001868 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001869 if not self.monitor:
1870 return
1871
showardd9205182009-04-27 20:09:55 +00001872 self._write_job_finished()
1873
showardd3dc1992009-04-22 21:01:40 +00001874 # both of these conditionals can be true, iff the process ran, wrote a
1875 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001876 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001877 gather_task = GatherLogsTask(self.job, self.queue_entries)
1878 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001879 else:
1880 self._reboot_hosts(self.job, self.queue_entries,
1881 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001882
1883 if self.monitor.lost_process:
1884 self._write_lost_process_error_file()
1885 for queue_entry in self.queue_entries:
1886 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001887
1888
showardcbd74612008-11-19 21:42:02 +00001889 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001890 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001891 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001892 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001893 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001894
1895
jadmanskif7fa2cc2008-10-01 14:13:23 +00001896 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001897 if not self.monitor or not self.monitor.has_process():
1898 return
1899
jadmanskif7fa2cc2008-10-01 14:13:23 +00001900 # build up sets of all the aborted_by and aborted_on values
1901 aborted_by, aborted_on = set(), set()
1902 for queue_entry in self.queue_entries:
1903 if queue_entry.aborted_by:
1904 aborted_by.add(queue_entry.aborted_by)
1905 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1906 aborted_on.add(t)
1907
1908 # extract some actual, unique aborted by value and write it out
1909 assert len(aborted_by) <= 1
1910 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001911 aborted_by_value = aborted_by.pop()
1912 aborted_on_value = max(aborted_on)
1913 else:
1914 aborted_by_value = 'autotest_system'
1915 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001916
showarda0382352009-02-11 23:36:43 +00001917 self._write_keyval_after_job("aborted_by", aborted_by_value)
1918 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001919
showardcbd74612008-11-19 21:42:02 +00001920 aborted_on_string = str(datetime.datetime.fromtimestamp(
1921 aborted_on_value))
1922 self._write_status_comment('Job aborted by %s on %s' %
1923 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001924
1925
jadmanski0afbb632008-06-06 21:10:57 +00001926 def abort(self):
1927 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001928 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001929 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001930
1931
jadmanski0afbb632008-06-06 21:10:57 +00001932 def epilog(self):
1933 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001934 self._finish_task()
1935 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001936
1937
showardd3dc1992009-04-22 21:01:40 +00001938class PostJobTask(AgentTask):
1939 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001940 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001941 self._queue_entries = queue_entries
1942 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001943
showarded2afea2009-07-07 20:54:07 +00001944 self._execution_path = self._get_consistent_execution_path(
1945 queue_entries)
1946 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001947 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001948 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001949 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1950
1951 if _testing_mode:
1952 command = 'true'
1953 else:
1954 command = self._generate_command(self._results_dir)
1955
showarded2afea2009-07-07 20:54:07 +00001956 super(PostJobTask, self).__init__(
1957 cmd=command, working_directory=self._execution_path,
1958 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001959
showarded2afea2009-07-07 20:54:07 +00001960 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001961 self._final_status = self._determine_final_status()
1962
1963
1964 def _generate_command(self, results_dir):
1965 raise NotImplementedError('Subclasses must override this')
1966
1967
1968 def _job_was_aborted(self):
1969 was_aborted = None
1970 for queue_entry in self._queue_entries:
1971 queue_entry.update_from_database()
1972 if was_aborted is None: # first queue entry
1973 was_aborted = bool(queue_entry.aborted)
1974 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1975 email_manager.manager.enqueue_notify_email(
1976 'Inconsistent abort state',
1977 'Queue entries have inconsistent abort state: ' +
1978 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1979 # don't crash here, just assume true
1980 return True
1981 return was_aborted
1982
1983
1984 def _determine_final_status(self):
1985 if self._job_was_aborted():
1986 return models.HostQueueEntry.Status.ABORTED
1987
1988 # we'll use a PidfileRunMonitor to read the autoserv exit status
1989 if self._autoserv_monitor.exit_code() == 0:
1990 return models.HostQueueEntry.Status.COMPLETED
1991 return models.HostQueueEntry.Status.FAILED
1992
1993
1994 def run(self):
showard5add1c82009-05-26 19:27:46 +00001995 # make sure we actually have results to work with.
1996 # this should never happen in normal operation.
1997 if not self._autoserv_monitor.has_process():
1998 email_manager.manager.enqueue_notify_email(
1999 'No results in post-job task',
2000 'No results in post-job task at %s' %
2001 self._autoserv_monitor.pidfile_id)
2002 self.finished(False)
2003 return
2004
2005 super(PostJobTask, self).run(
2006 pidfile_name=self._pidfile_name,
2007 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002008
2009
2010 def _set_all_statuses(self, status):
2011 for queue_entry in self._queue_entries:
2012 queue_entry.set_status(status)
2013
2014
2015 def abort(self):
2016 # override AgentTask.abort() to avoid killing the process and ending
2017 # the task. post-job tasks continue when the job is aborted.
2018 pass
2019
2020
showardb5626452009-06-30 01:57:28 +00002021class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002022 """
2023 Task responsible for
2024 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2025 * copying logs to the results repository
2026 * spawning CleanupTasks for hosts, if necessary
2027 * spawning a FinalReparseTask for the job
2028 """
showarded2afea2009-07-07 20:54:07 +00002029 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002030 self._job = job
2031 super(GatherLogsTask, self).__init__(
2032 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002033 logfile_name='.collect_crashinfo.log',
2034 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002035 self._set_ids(queue_entries=queue_entries)
2036
2037
2038 def _generate_command(self, results_dir):
2039 host_list = ','.join(queue_entry.host.hostname
2040 for queue_entry in self._queue_entries)
2041 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2042 '-r', results_dir]
2043
2044
2045 def prolog(self):
2046 super(GatherLogsTask, self).prolog()
2047 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2048
2049
showardd3dc1992009-04-22 21:01:40 +00002050 def epilog(self):
2051 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002052 if self._autoserv_monitor.has_process():
2053 self._copy_and_parse_results(self._queue_entries,
2054 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002055
2056 final_success = (
2057 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2058 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2059 self._reboot_hosts(self._job, self._queue_entries, final_success,
2060 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002061
2062
showard0bbfc212009-04-29 21:06:13 +00002063 def run(self):
showard597bfd32009-05-08 18:22:50 +00002064 autoserv_exit_code = self._autoserv_monitor.exit_code()
2065 # only run if Autoserv exited due to some signal. if we have no exit
2066 # code, assume something bad (and signal-like) happened.
2067 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002068 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002069 else:
2070 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002071
2072
showard8fe93b52008-11-18 17:53:22 +00002073class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002074 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2075
2076
2077 def __init__(self, host=None, queue_entry=None, task=None,
2078 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002079 assert bool(host) ^ bool(queue_entry)
2080 if queue_entry:
2081 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002082 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002083 self.host = host
showard170873e2009-01-07 00:22:26 +00002084
showarde788ea62008-11-17 21:02:47 +00002085 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002086 super(CleanupTask, self).__init__(
2087 task, ['--cleanup'], failure_tasks=[repair_task],
2088 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002089
2090 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002091
mblighd5c95802008-03-05 00:33:46 +00002092
jadmanski0afbb632008-06-06 21:10:57 +00002093 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002094 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002095 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002096 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002097
mblighd5c95802008-03-05 00:33:46 +00002098
showard21baa452008-10-21 00:08:39 +00002099 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002100 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002101
showard21baa452008-10-21 00:08:39 +00002102 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002103 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002104 self.host.update_field('dirty', 0)
2105
2106
showardd3dc1992009-04-22 21:01:40 +00002107class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002108 _num_running_parses = 0
2109
showarded2afea2009-07-07 20:54:07 +00002110 def __init__(self, queue_entries, recover_run_monitor=None):
2111 super(FinalReparseTask, self).__init__(
2112 queue_entries, pidfile_name=_PARSER_PID_FILE,
2113 logfile_name='.parse.log',
2114 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002115 # don't use _set_ids, since we don't want to set the host_ids
2116 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002117 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002118
showard97aed502008-11-04 02:01:24 +00002119
2120 @classmethod
2121 def _increment_running_parses(cls):
2122 cls._num_running_parses += 1
2123
2124
2125 @classmethod
2126 def _decrement_running_parses(cls):
2127 cls._num_running_parses -= 1
2128
2129
2130 @classmethod
2131 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002132 return (cls._num_running_parses <
2133 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002134
2135
2136 def prolog(self):
2137 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002138 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002139
2140
2141 def epilog(self):
2142 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002143 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002144
2145
showardd3dc1992009-04-22 21:01:40 +00002146 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002147 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002148 results_dir]
showard97aed502008-11-04 02:01:24 +00002149
2150
showard08a36412009-05-05 01:01:13 +00002151 def tick(self):
2152 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002153 # and we can, at which point we revert to default behavior
2154 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002155 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002156 else:
2157 self._try_starting_parse()
2158
2159
2160 def run(self):
2161 # override run() to not actually run unless we can
2162 self._try_starting_parse()
2163
2164
2165 def _try_starting_parse(self):
2166 if not self._can_run_new_parse():
2167 return
showard170873e2009-01-07 00:22:26 +00002168
showard97aed502008-11-04 02:01:24 +00002169 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002170 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002171
showard97aed502008-11-04 02:01:24 +00002172 self._increment_running_parses()
2173 self._parse_started = True
2174
2175
2176 def finished(self, success):
2177 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002178 if self._parse_started:
2179 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002180
2181
showardc9ae1782009-01-30 01:42:37 +00002182class SetEntryPendingTask(AgentTask):
2183 def __init__(self, queue_entry):
2184 super(SetEntryPendingTask, self).__init__(cmd='')
2185 self._queue_entry = queue_entry
2186 self._set_ids(queue_entries=[queue_entry])
2187
2188
2189 def run(self):
2190 agent = self._queue_entry.on_pending()
2191 if agent:
2192 self.agent.dispatcher.add_agent(agent)
2193 self.finished(True)
2194
2195
showarda3c58572009-03-12 20:36:59 +00002196class DBError(Exception):
2197 """Raised by the DBObject constructor when its select fails."""
2198
2199
mbligh36768f02008-02-22 18:28:33 +00002200class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002201 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002202
2203 # Subclasses MUST override these:
2204 _table_name = ''
2205 _fields = ()
2206
showarda3c58572009-03-12 20:36:59 +00002207 # A mapping from (type, id) to the instance of the object for that
2208 # particular id. This prevents us from creating new Job() and Host()
2209 # instances for every HostQueueEntry object that we instantiate as
2210 # multiple HQEs often share the same Job.
2211 _instances_by_type_and_id = weakref.WeakValueDictionary()
2212 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002213
showarda3c58572009-03-12 20:36:59 +00002214
2215 def __new__(cls, id=None, **kwargs):
2216 """
2217 Look to see if we already have an instance for this particular type
2218 and id. If so, use it instead of creating a duplicate instance.
2219 """
2220 if id is not None:
2221 instance = cls._instances_by_type_and_id.get((cls, id))
2222 if instance:
2223 return instance
2224 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2225
2226
2227 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002228 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002229 assert self._table_name, '_table_name must be defined in your class'
2230 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002231 if not new_record:
2232 if self._initialized and not always_query:
2233 return # We've already been initialized.
2234 if id is None:
2235 id = row[0]
2236 # Tell future constructors to use us instead of re-querying while
2237 # this instance is still around.
2238 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002239
showard6ae5ea92009-02-25 00:11:51 +00002240 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002241
jadmanski0afbb632008-06-06 21:10:57 +00002242 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002243
jadmanski0afbb632008-06-06 21:10:57 +00002244 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002245 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002246
showarda3c58572009-03-12 20:36:59 +00002247 if self._initialized:
2248 differences = self._compare_fields_in_row(row)
2249 if differences:
showard7629f142009-03-27 21:02:02 +00002250 logging.warn(
2251 'initialized %s %s instance requery is updating: %s',
2252 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002253 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002254 self._initialized = True
2255
2256
2257 @classmethod
2258 def _clear_instance_cache(cls):
2259 """Used for testing, clear the internal instance cache."""
2260 cls._instances_by_type_and_id.clear()
2261
2262
showardccbd6c52009-03-21 00:10:21 +00002263 def _fetch_row_from_db(self, row_id):
2264 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2265 rows = _db.execute(sql, (row_id,))
2266 if not rows:
showard76e29d12009-04-15 21:53:10 +00002267 raise DBError("row not found (table=%s, row id=%s)"
2268 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002269 return rows[0]
2270
2271
showarda3c58572009-03-12 20:36:59 +00002272 def _assert_row_length(self, row):
2273 assert len(row) == len(self._fields), (
2274 "table = %s, row = %s/%d, fields = %s/%d" % (
2275 self.__table, row, len(row), self._fields, len(self._fields)))
2276
2277
2278 def _compare_fields_in_row(self, row):
2279 """
2280 Given a row as returned by a SELECT query, compare it to our existing
2281 in memory fields.
2282
2283 @param row - A sequence of values corresponding to fields named in
2284 The class attribute _fields.
2285
2286 @returns A dictionary listing the differences keyed by field name
2287 containing tuples of (current_value, row_value).
2288 """
2289 self._assert_row_length(row)
2290 differences = {}
2291 for field, row_value in itertools.izip(self._fields, row):
2292 current_value = getattr(self, field)
2293 if current_value != row_value:
2294 differences[field] = (current_value, row_value)
2295 return differences
showard2bab8f42008-11-12 18:15:22 +00002296
2297
2298 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002299 """
2300 Update our field attributes using a single row returned by SELECT.
2301
2302 @param row - A sequence of values corresponding to fields named in
2303 the class fields list.
2304 """
2305 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002306
showard2bab8f42008-11-12 18:15:22 +00002307 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002308 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002309 setattr(self, field, value)
2310 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002311
showard2bab8f42008-11-12 18:15:22 +00002312 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002313
mblighe2586682008-02-29 22:45:46 +00002314
showardccbd6c52009-03-21 00:10:21 +00002315 def update_from_database(self):
2316 assert self.id is not None
2317 row = self._fetch_row_from_db(self.id)
2318 self._update_fields_from_row(row)
2319
2320
jadmanski0afbb632008-06-06 21:10:57 +00002321 def count(self, where, table = None):
2322 if not table:
2323 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002324
jadmanski0afbb632008-06-06 21:10:57 +00002325 rows = _db.execute("""
2326 SELECT count(*) FROM %s
2327 WHERE %s
2328 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002329
jadmanski0afbb632008-06-06 21:10:57 +00002330 assert len(rows) == 1
2331
2332 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002333
2334
showardd3dc1992009-04-22 21:01:40 +00002335 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002336 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002337
showard2bab8f42008-11-12 18:15:22 +00002338 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002339 return
mbligh36768f02008-02-22 18:28:33 +00002340
mblighf8c624d2008-07-03 16:58:45 +00002341 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002342 _db.execute(query, (value, self.id))
2343
showard2bab8f42008-11-12 18:15:22 +00002344 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002345
2346
jadmanski0afbb632008-06-06 21:10:57 +00002347 def save(self):
2348 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002349 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002350 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002351 values = []
2352 for key in keys:
2353 value = getattr(self, key)
2354 if value is None:
2355 values.append('NULL')
2356 else:
2357 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002358 values_str = ','.join(values)
2359 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2360 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002361 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002362 # Update our id to the one the database just assigned to us.
2363 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002364
2365
jadmanski0afbb632008-06-06 21:10:57 +00002366 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002367 self._instances_by_type_and_id.pop((type(self), id), None)
2368 self._initialized = False
2369 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002370 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2371 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002372
2373
showard63a34772008-08-18 19:32:50 +00002374 @staticmethod
2375 def _prefix_with(string, prefix):
2376 if string:
2377 string = prefix + string
2378 return string
2379
2380
jadmanski0afbb632008-06-06 21:10:57 +00002381 @classmethod
showard989f25d2008-10-01 11:38:11 +00002382 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002383 """
2384 Construct instances of our class based on the given database query.
2385
2386 @yields One class instance for each row fetched.
2387 """
showard63a34772008-08-18 19:32:50 +00002388 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2389 where = cls._prefix_with(where, 'WHERE ')
2390 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002391 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002392 'joins' : joins,
2393 'where' : where,
2394 'order_by' : order_by})
2395 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002396 for row in rows:
2397 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002398
mbligh36768f02008-02-22 18:28:33 +00002399
2400class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002401 _table_name = 'ineligible_host_queues'
2402 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002403
2404
showard89f84db2009-03-12 20:39:13 +00002405class AtomicGroup(DBObject):
2406 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002407 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2408 'invalid')
showard89f84db2009-03-12 20:39:13 +00002409
2410
showard989f25d2008-10-01 11:38:11 +00002411class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002412 _table_name = 'labels'
2413 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002414 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002415
2416
showard6157c632009-07-06 20:19:31 +00002417 def __repr__(self):
2418 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2419 self.name, self.id, self.atomic_group_id)
2420
2421
mbligh36768f02008-02-22 18:28:33 +00002422class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002423 _table_name = 'hosts'
2424 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2425 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2426
2427
jadmanski0afbb632008-06-06 21:10:57 +00002428 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002429 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002430 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002431
2432
showard170873e2009-01-07 00:22:26 +00002433 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002434 """
showard170873e2009-01-07 00:22:26 +00002435 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002436 """
2437 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002438 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002439 FROM labels
2440 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002441 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002442 ORDER BY labels.name
2443 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002444 platform = None
2445 all_labels = []
2446 for label_name, is_platform in rows:
2447 if is_platform:
2448 platform = label_name
2449 all_labels.append(label_name)
2450 return platform, all_labels
2451
2452
showard2fe3f1d2009-07-06 20:19:11 +00002453 def reverify_tasks(self):
2454 cleanup_task = CleanupTask(host=self)
2455 verify_task = VerifyTask(host=self)
2456
showard6d7b2ff2009-06-10 00:16:47 +00002457 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002458 self.set_status('Cleaning')
2459 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002460
2461
showard54c1ea92009-05-20 00:32:58 +00002462 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2463
2464
2465 @classmethod
2466 def cmp_for_sort(cls, a, b):
2467 """
2468 A comparison function for sorting Host objects by hostname.
2469
2470 This strips any trailing numeric digits, ignores leading 0s and
2471 compares hostnames by the leading name and the trailing digits as a
2472 number. If both hostnames do not match this pattern, they are simply
2473 compared as lower case strings.
2474
2475 Example of how hostnames will be sorted:
2476
2477 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2478
2479 This hopefully satisfy most people's hostname sorting needs regardless
2480 of their exact naming schemes. Nobody sane should have both a host10
2481 and host010 (but the algorithm works regardless).
2482 """
2483 lower_a = a.hostname.lower()
2484 lower_b = b.hostname.lower()
2485 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2486 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2487 if match_a and match_b:
2488 name_a, number_a_str = match_a.groups()
2489 name_b, number_b_str = match_b.groups()
2490 number_a = int(number_a_str.lstrip('0'))
2491 number_b = int(number_b_str.lstrip('0'))
2492 result = cmp((name_a, number_a), (name_b, number_b))
2493 if result == 0 and lower_a != lower_b:
2494 # If they compared equal above but the lower case names are
2495 # indeed different, don't report equality. abc012 != abc12.
2496 return cmp(lower_a, lower_b)
2497 return result
2498 else:
2499 return cmp(lower_a, lower_b)
2500
2501
mbligh36768f02008-02-22 18:28:33 +00002502class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002503 _table_name = 'host_queue_entries'
2504 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002505 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002506 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002507
2508
showarda3c58572009-03-12 20:36:59 +00002509 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002510 assert id or row
showarda3c58572009-03-12 20:36:59 +00002511 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002512 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002513
jadmanski0afbb632008-06-06 21:10:57 +00002514 if self.host_id:
2515 self.host = Host(self.host_id)
2516 else:
2517 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002518
showard77182562009-06-10 00:16:05 +00002519 if self.atomic_group_id:
2520 self.atomic_group = AtomicGroup(self.atomic_group_id,
2521 always_query=False)
2522 else:
2523 self.atomic_group = None
2524
showard170873e2009-01-07 00:22:26 +00002525 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002526 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002527
2528
showard89f84db2009-03-12 20:39:13 +00002529 @classmethod
2530 def clone(cls, template):
2531 """
2532 Creates a new row using the values from a template instance.
2533
2534 The new instance will not exist in the database or have a valid
2535 id attribute until its save() method is called.
2536 """
2537 assert isinstance(template, cls)
2538 new_row = [getattr(template, field) for field in cls._fields]
2539 clone = cls(row=new_row, new_record=True)
2540 clone.id = None
2541 return clone
2542
2543
showardc85c21b2008-11-24 22:17:37 +00002544 def _view_job_url(self):
2545 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2546
2547
showardf1ae3542009-05-11 19:26:02 +00002548 def get_labels(self):
2549 """
2550 Get all labels associated with this host queue entry (either via the
2551 meta_host or as a job dependency label). The labels yielded are not
2552 guaranteed to be unique.
2553
2554 @yields Label instances associated with this host_queue_entry.
2555 """
2556 if self.meta_host:
2557 yield Label(id=self.meta_host, always_query=False)
2558 labels = Label.fetch(
2559 joins="JOIN jobs_dependency_labels AS deps "
2560 "ON (labels.id = deps.label_id)",
2561 where="deps.job_id = %d" % self.job.id)
2562 for label in labels:
2563 yield label
2564
2565
jadmanski0afbb632008-06-06 21:10:57 +00002566 def set_host(self, host):
2567 if host:
2568 self.queue_log_record('Assigning host ' + host.hostname)
2569 self.update_field('host_id', host.id)
2570 self.update_field('active', True)
2571 self.block_host(host.id)
2572 else:
2573 self.queue_log_record('Releasing host')
2574 self.unblock_host(self.host.id)
2575 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002576
jadmanski0afbb632008-06-06 21:10:57 +00002577 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002578
2579
jadmanski0afbb632008-06-06 21:10:57 +00002580 def get_host(self):
2581 return self.host
mbligh36768f02008-02-22 18:28:33 +00002582
2583
jadmanski0afbb632008-06-06 21:10:57 +00002584 def queue_log_record(self, log_line):
2585 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002586 _drone_manager.write_lines_to_file(self.queue_log_path,
2587 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002588
2589
jadmanski0afbb632008-06-06 21:10:57 +00002590 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002591 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002592 row = [0, self.job.id, host_id]
2593 block = IneligibleHostQueue(row=row, new_record=True)
2594 block.save()
mblighe2586682008-02-29 22:45:46 +00002595
2596
jadmanski0afbb632008-06-06 21:10:57 +00002597 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002598 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002599 blocks = IneligibleHostQueue.fetch(
2600 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2601 for block in blocks:
2602 block.delete()
mblighe2586682008-02-29 22:45:46 +00002603
2604
showard2bab8f42008-11-12 18:15:22 +00002605 def set_execution_subdir(self, subdir=None):
2606 if subdir is None:
2607 assert self.get_host()
2608 subdir = self.get_host().hostname
2609 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002610
2611
showard6355f6b2008-12-05 18:52:13 +00002612 def _get_hostname(self):
2613 if self.host:
2614 return self.host.hostname
2615 return 'no host'
2616
2617
showard170873e2009-01-07 00:22:26 +00002618 def __str__(self):
2619 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2620
2621
jadmanski0afbb632008-06-06 21:10:57 +00002622 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002623 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002624
showardb18134f2009-03-20 20:52:18 +00002625 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002626
showardc85c21b2008-11-24 22:17:37 +00002627 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002628 self.update_field('complete', False)
2629 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002630
jadmanski0afbb632008-06-06 21:10:57 +00002631 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002632 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002633 self.update_field('complete', False)
2634 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002635
showardc85c21b2008-11-24 22:17:37 +00002636 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002637 self.update_field('complete', True)
2638 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002639
2640 should_email_status = (status.lower() in _notify_email_statuses or
2641 'all' in _notify_email_statuses)
2642 if should_email_status:
2643 self._email_on_status(status)
2644
2645 self._email_on_job_complete()
2646
2647
2648 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002649 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002650
2651 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2652 self.job.id, self.job.name, hostname, status)
2653 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2654 self.job.id, self.job.name, hostname, status,
2655 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002656 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002657
2658
2659 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002660 if not self.job.is_finished():
2661 return
showard542e8402008-09-19 20:16:18 +00002662
showardc85c21b2008-11-24 22:17:37 +00002663 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002664 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002665 for queue_entry in hosts_queue:
2666 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002667 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002668 queue_entry.status))
2669
2670 summary_text = "\n".join(summary_text)
2671 status_counts = models.Job.objects.get_status_counts(
2672 [self.job.id])[self.job.id]
2673 status = ', '.join('%d %s' % (count, status) for status, count
2674 in status_counts.iteritems())
2675
2676 subject = 'Autotest: Job ID: %s "%s" %s' % (
2677 self.job.id, self.job.name, status)
2678 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2679 self.job.id, self.job.name, status, self._view_job_url(),
2680 summary_text)
showard170873e2009-01-07 00:22:26 +00002681 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002682
2683
showard77182562009-06-10 00:16:05 +00002684 def run_pre_job_tasks(self, assigned_host=None):
2685 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002686 assert assigned_host
2687 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002688 if self.host_id is None:
2689 self.set_host(assigned_host)
2690 else:
2691 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002692
showardcfd4a7e2009-07-11 01:47:33 +00002693 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002694 self.job.name, self.meta_host, self.atomic_group_id,
2695 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002696
showard77182562009-06-10 00:16:05 +00002697 return self._do_run_pre_job_tasks()
2698
2699
2700 def _do_run_pre_job_tasks(self):
2701 # Every host goes thru the Verifying stage (which may or may not
2702 # actually do anything as determined by get_pre_job_tasks).
2703 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2704
2705 # The pre job tasks always end with a SetEntryPendingTask which
2706 # will continue as appropriate through queue_entry.on_pending().
2707 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002708
showard6ae5ea92009-02-25 00:11:51 +00002709
jadmanski0afbb632008-06-06 21:10:57 +00002710 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002711 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002712 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002713 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002714 # verify/cleanup failure sets the execution subdir, so reset it here
2715 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002716 if self.meta_host:
2717 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002718
2719
jadmanski0afbb632008-06-06 21:10:57 +00002720 def handle_host_failure(self):
2721 """\
2722 Called when this queue entry's host has failed verification and
2723 repair.
2724 """
2725 assert not self.meta_host
2726 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002727 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002728
2729
jadmanskif7fa2cc2008-10-01 14:13:23 +00002730 @property
2731 def aborted_by(self):
2732 self._load_abort_info()
2733 return self._aborted_by
2734
2735
2736 @property
2737 def aborted_on(self):
2738 self._load_abort_info()
2739 return self._aborted_on
2740
2741
2742 def _load_abort_info(self):
2743 """ Fetch info about who aborted the job. """
2744 if hasattr(self, "_aborted_by"):
2745 return
2746 rows = _db.execute("""
2747 SELECT users.login, aborted_host_queue_entries.aborted_on
2748 FROM aborted_host_queue_entries
2749 INNER JOIN users
2750 ON users.id = aborted_host_queue_entries.aborted_by_id
2751 WHERE aborted_host_queue_entries.queue_entry_id = %s
2752 """, (self.id,))
2753 if rows:
2754 self._aborted_by, self._aborted_on = rows[0]
2755 else:
2756 self._aborted_by = self._aborted_on = None
2757
2758
showardb2e2c322008-10-14 17:33:55 +00002759 def on_pending(self):
2760 """
2761 Called when an entry in a synchronous job has passed verify. If the
2762 job is ready to run, returns an agent to run the job. Returns None
2763 otherwise.
2764 """
2765 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002766 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002767 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002768
2769
showardd3dc1992009-04-22 21:01:40 +00002770 def abort(self, dispatcher):
2771 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002772
showardd3dc1992009-04-22 21:01:40 +00002773 Status = models.HostQueueEntry.Status
2774 has_running_job_agent = (
2775 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2776 and dispatcher.get_agents_for_entry(self))
2777 if has_running_job_agent:
2778 # do nothing; post-job tasks will finish and then mark this entry
2779 # with status "Aborted" and take care of the host
2780 return
2781
2782 if self.status in (Status.STARTING, Status.PENDING):
2783 self.host.set_status(models.Host.Status.READY)
2784 elif self.status == Status.VERIFYING:
2785 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2786
2787 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002788
2789 def execution_tag(self):
2790 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002791 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002792
2793
showarded2afea2009-07-07 20:54:07 +00002794 def execution_path(self):
2795 return self.execution_tag()
2796
2797
mbligh36768f02008-02-22 18:28:33 +00002798class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002799 _table_name = 'jobs'
2800 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2801 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002802 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002803 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002804
showard77182562009-06-10 00:16:05 +00002805 # This does not need to be a column in the DB. The delays are likely to
2806 # be configured short. If the scheduler is stopped and restarted in
2807 # the middle of a job's delay cycle, the delay cycle will either be
2808 # repeated or skipped depending on the number of Pending machines found
2809 # when the restarted scheduler recovers to track it. Not a problem.
2810 #
2811 # A reference to the DelayedCallTask that will wake up the job should
2812 # no other HQEs change state in time. Its end_time attribute is used
2813 # by our run_with_ready_delay() method to determine if the wait is over.
2814 _delay_ready_task = None
2815
2816 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2817 # all status='Pending' atomic group HQEs incase a delay was running when the
2818 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002819
showarda3c58572009-03-12 20:36:59 +00002820 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002821 assert id or row
showarda3c58572009-03-12 20:36:59 +00002822 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002823
mblighe2586682008-02-29 22:45:46 +00002824
jadmanski0afbb632008-06-06 21:10:57 +00002825 def is_server_job(self):
2826 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002827
2828
showard170873e2009-01-07 00:22:26 +00002829 def tag(self):
2830 return "%s-%s" % (self.id, self.owner)
2831
2832
jadmanski0afbb632008-06-06 21:10:57 +00002833 def get_host_queue_entries(self):
2834 rows = _db.execute("""
2835 SELECT * FROM host_queue_entries
2836 WHERE job_id= %s
2837 """, (self.id,))
2838 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002839
jadmanski0afbb632008-06-06 21:10:57 +00002840 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002841
jadmanski0afbb632008-06-06 21:10:57 +00002842 return entries
mbligh36768f02008-02-22 18:28:33 +00002843
2844
jadmanski0afbb632008-06-06 21:10:57 +00002845 def set_status(self, status, update_queues=False):
2846 self.update_field('status',status)
2847
2848 if update_queues:
2849 for queue_entry in self.get_host_queue_entries():
2850 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002851
2852
showard77182562009-06-10 00:16:05 +00002853 def _atomic_and_has_started(self):
2854 """
2855 @returns True if any of the HostQueueEntries associated with this job
2856 have entered the Status.STARTING state or beyond.
2857 """
2858 atomic_entries = models.HostQueueEntry.objects.filter(
2859 job=self.id, atomic_group__isnull=False)
2860 if atomic_entries.count() <= 0:
2861 return False
2862
showardaf8b4ca2009-06-16 18:47:26 +00002863 # These states may *only* be reached if Job.run() has been called.
2864 started_statuses = (models.HostQueueEntry.Status.STARTING,
2865 models.HostQueueEntry.Status.RUNNING,
2866 models.HostQueueEntry.Status.COMPLETED)
2867
2868 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002869 return started_entries.count() > 0
2870
2871
2872 def _pending_count(self):
2873 """The number of HostQueueEntries for this job in the Pending state."""
2874 pending_entries = models.HostQueueEntry.objects.filter(
2875 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2876 return pending_entries.count()
2877
2878
jadmanski0afbb632008-06-06 21:10:57 +00002879 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002880 # NOTE: Atomic group jobs stop reporting ready after they have been
2881 # started to avoid launching multiple copies of one atomic job.
2882 # Only possible if synch_count is less than than half the number of
2883 # machines in the atomic group.
2884 return (self._pending_count() >= self.synch_count
2885 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002886
2887
jadmanski0afbb632008-06-06 21:10:57 +00002888 def num_machines(self, clause = None):
2889 sql = "job_id=%s" % self.id
2890 if clause:
2891 sql += " AND (%s)" % clause
2892 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002893
2894
jadmanski0afbb632008-06-06 21:10:57 +00002895 def num_queued(self):
2896 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002897
2898
jadmanski0afbb632008-06-06 21:10:57 +00002899 def num_active(self):
2900 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002901
2902
jadmanski0afbb632008-06-06 21:10:57 +00002903 def num_complete(self):
2904 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002905
2906
jadmanski0afbb632008-06-06 21:10:57 +00002907 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002908 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002909
mbligh36768f02008-02-22 18:28:33 +00002910
showard6bb7c292009-01-30 01:44:51 +00002911 def _not_yet_run_entries(self, include_verifying=True):
2912 statuses = [models.HostQueueEntry.Status.QUEUED,
2913 models.HostQueueEntry.Status.PENDING]
2914 if include_verifying:
2915 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2916 return models.HostQueueEntry.objects.filter(job=self.id,
2917 status__in=statuses)
2918
2919
2920 def _stop_all_entries(self):
2921 entries_to_stop = self._not_yet_run_entries(
2922 include_verifying=False)
2923 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002924 assert not child_entry.complete, (
2925 '%s status=%s, active=%s, complete=%s' %
2926 (child_entry.id, child_entry.status, child_entry.active,
2927 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002928 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2929 child_entry.host.status = models.Host.Status.READY
2930 child_entry.host.save()
2931 child_entry.status = models.HostQueueEntry.Status.STOPPED
2932 child_entry.save()
2933
showard2bab8f42008-11-12 18:15:22 +00002934 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002935 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002936 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002937 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002938
2939
jadmanski0afbb632008-06-06 21:10:57 +00002940 def write_to_machines_file(self, queue_entry):
2941 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002942 file_path = os.path.join(self.tag(), '.machines')
2943 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002944
2945
showardf1ae3542009-05-11 19:26:02 +00002946 def _next_group_name(self, group_name=''):
2947 """@returns a directory name to use for the next host group results."""
2948 if group_name:
2949 # Sanitize for use as a pathname.
2950 group_name = group_name.replace(os.path.sep, '_')
2951 if group_name.startswith('.'):
2952 group_name = '_' + group_name[1:]
2953 # Add a separator between the group name and 'group%d'.
2954 group_name += '.'
2955 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002956 query = models.HostQueueEntry.objects.filter(
2957 job=self.id).values('execution_subdir').distinct()
2958 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002959 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2960 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002961 if ids:
2962 next_id = max(ids) + 1
2963 else:
2964 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002965 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002966
2967
showard170873e2009-01-07 00:22:26 +00002968 def _write_control_file(self, execution_tag):
2969 control_path = _drone_manager.attach_file_to_execution(
2970 execution_tag, self.control_file)
2971 return control_path
mbligh36768f02008-02-22 18:28:33 +00002972
showardb2e2c322008-10-14 17:33:55 +00002973
showard2bab8f42008-11-12 18:15:22 +00002974 def get_group_entries(self, queue_entry_from_group):
2975 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002976 return list(HostQueueEntry.fetch(
2977 where='job_id=%s AND execution_subdir=%s',
2978 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002979
2980
showardb2e2c322008-10-14 17:33:55 +00002981 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002982 assert queue_entries
2983 execution_tag = queue_entries[0].execution_tag()
2984 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002985 hostnames = ','.join([entry.get_host().hostname
2986 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002987
showard87ba02a2009-04-20 19:37:32 +00002988 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00002989 hostnames,
showard87ba02a2009-04-20 19:37:32 +00002990 ['-P', execution_tag, '-n',
2991 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00002992 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00002993
jadmanski0afbb632008-06-06 21:10:57 +00002994 if not self.is_server_job():
2995 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002996
showardb2e2c322008-10-14 17:33:55 +00002997 return params
mblighe2586682008-02-29 22:45:46 +00002998
mbligh36768f02008-02-22 18:28:33 +00002999
showardc9ae1782009-01-30 01:42:37 +00003000 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003001 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003002 return True
showard0fc38302008-10-23 00:44:07 +00003003 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003004 return queue_entry.get_host().dirty
3005 return False
showard21baa452008-10-21 00:08:39 +00003006
showardc9ae1782009-01-30 01:42:37 +00003007
showard2fe3f1d2009-07-06 20:19:11 +00003008 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003009 do_not_verify = (queue_entry.host.protection ==
3010 host_protections.Protection.DO_NOT_VERIFY)
3011 if do_not_verify:
3012 return False
3013 return self.run_verify
3014
3015
showard77182562009-06-10 00:16:05 +00003016 def get_pre_job_tasks(self, queue_entry):
3017 """
3018 Get a list of tasks to perform before the host_queue_entry
3019 may be used to run this Job (such as Cleanup & Verify).
3020
3021 @returns A list of tasks to be done to the given queue_entry before
3022 it should be considered be ready to run this job. The last
3023 task in the list calls HostQueueEntry.on_pending(), which
3024 continues the flow of the job.
3025 """
showard21baa452008-10-21 00:08:39 +00003026 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003027 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003028 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003029 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003030 tasks.append(VerifyTask(queue_entry=queue_entry))
3031 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003032 return tasks
3033
3034
showardf1ae3542009-05-11 19:26:02 +00003035 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003036 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003037 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003038 else:
showardf1ae3542009-05-11 19:26:02 +00003039 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003040 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003041 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003042 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003043
3044 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003045 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003046
3047
3048 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003049 """
3050 @returns A tuple containing a list of HostQueueEntry instances to be
3051 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003052 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003053 """
showard77182562009-06-10 00:16:05 +00003054 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003055 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003056 if atomic_group:
3057 num_entries_wanted = atomic_group.max_number_of_machines
3058 else:
3059 num_entries_wanted = self.synch_count
3060 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003061
showardf1ae3542009-05-11 19:26:02 +00003062 if num_entries_wanted > 0:
3063 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003064 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003065 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003066 params=(self.id, include_queue_entry.id)))
3067
3068 # Sort the chosen hosts by hostname before slicing.
3069 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3070 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3071 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3072 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003073
showardf1ae3542009-05-11 19:26:02 +00003074 # Sanity check. We'll only ever be called if this can be met.
3075 assert len(chosen_entries) >= self.synch_count
3076
3077 if atomic_group:
3078 # Look at any meta_host and dependency labels and pick the first
3079 # one that also specifies this atomic group. Use that label name
3080 # as the group name if possible (it is more specific).
3081 group_name = atomic_group.name
3082 for label in include_queue_entry.get_labels():
3083 if label.atomic_group_id:
3084 assert label.atomic_group_id == atomic_group.id
3085 group_name = label.name
3086 break
3087 else:
3088 group_name = ''
3089
3090 self._assign_new_group(chosen_entries, group_name=group_name)
3091 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003092
3093
showard77182562009-06-10 00:16:05 +00003094 def run_if_ready(self, queue_entry):
3095 """
3096 @returns An Agent instance to ultimately run this job if enough hosts
3097 are ready for it to run.
3098 @returns None and potentially cleans up excess hosts if this Job
3099 is not ready to run.
3100 """
showardb2e2c322008-10-14 17:33:55 +00003101 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003102 self.stop_if_necessary()
3103 return None
mbligh36768f02008-02-22 18:28:33 +00003104
showard77182562009-06-10 00:16:05 +00003105 if queue_entry.atomic_group:
3106 return self.run_with_ready_delay(queue_entry)
3107
3108 return self.run(queue_entry)
3109
3110
3111 def run_with_ready_delay(self, queue_entry):
3112 """
3113 Start a delay to wait for more hosts to enter Pending state before
3114 launching an atomic group job. Once set, the a delay cannot be reset.
3115
3116 @param queue_entry: The HostQueueEntry object to get atomic group
3117 info from and pass to run_if_ready when the delay is up.
3118
3119 @returns An Agent to run the job as appropriate or None if a delay
3120 has already been set.
3121 """
3122 assert queue_entry.job_id == self.id
3123 assert queue_entry.atomic_group
3124 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3125 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3126 over_max_threshold = (self._pending_count() >= pending_threshold)
3127 delay_expired = (self._delay_ready_task and
3128 time.time() >= self._delay_ready_task.end_time)
3129
3130 # Delay is disabled or we already have enough? Do not wait to run.
3131 if not delay or over_max_threshold or delay_expired:
3132 return self.run(queue_entry)
3133
3134 # A delay was previously scheduled.
3135 if self._delay_ready_task:
3136 return None
3137
3138 def run_job_after_delay():
3139 logging.info('Job %s done waiting for extra hosts.', self.id)
3140 return self.run(queue_entry)
3141
3142 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3143 callback=run_job_after_delay)
3144
3145 return Agent([self._delay_ready_task], num_processes=0)
3146
3147
3148 def run(self, queue_entry):
3149 """
3150 @param queue_entry: The HostQueueEntry instance calling this method.
3151 @returns An Agent instance to run this job or None if we've already
3152 been run.
3153 """
3154 if queue_entry.atomic_group and self._atomic_and_has_started():
3155 logging.error('Job.run() called on running atomic Job %d '
3156 'with HQE %s.', self.id, queue_entry)
3157 return None
showardf1ae3542009-05-11 19:26:02 +00003158 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3159 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003160
3161
showardf1ae3542009-05-11 19:26:02 +00003162 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003163 for queue_entry in queue_entries:
3164 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003165 params = self._get_autoserv_params(queue_entries)
3166 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003167 cmd=params, group_name=group_name)
3168 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003169 if self._delay_ready_task:
3170 # Cancel any pending callback that would try to run again
3171 # as we are already running.
3172 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003173
showard170873e2009-01-07 00:22:26 +00003174 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003175
3176
mbligh36768f02008-02-22 18:28:33 +00003177if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003178 main()