blob: d4ec92afe0f12db6d3ad5a1d5788938c4938d8eb [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000025from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
showardf13a9e22009-12-18 22:54:09 +000027from autotest_lib.scheduler import gc_stats
mbligh70feeee2008-06-11 16:20:49 +000028
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showardd3dc1992009-04-22 21:01:40 +000045_AUTOSERV_PID_FILE = '.autoserv_execute'
46_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
47_PARSER_PID_FILE = '.parser_execute'
48
49_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
50 _PARSER_PID_FILE)
51
showard35162b02009-03-03 02:17:30 +000052# error message to leave in results dir when an autoserv process disappears
53# mysteriously
54_LOST_PROCESS_ERROR = """\
55Autoserv failed abnormally during execution for this job, probably due to a
56system error on the Autotest server. Full results may not be available. Sorry.
57"""
58
mbligh6f8bab42008-02-29 22:45:14 +000059_db = None
mbligh36768f02008-02-22 18:28:33 +000060_shutdown = False
showard170873e2009-01-07 00:22:26 +000061_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
62_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000063_testing_mode = False
showard542e8402008-09-19 20:16:18 +000064_base_url = None
showardc85c21b2008-11-24 22:17:37 +000065_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000066_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000067
68
showardec6a3b92009-09-25 20:29:13 +000069def _get_pidfile_timeout_secs():
70 """@returns How long to wait for autoserv to write pidfile."""
71 pidfile_timeout_mins = global_config.global_config.get_config_value(
72 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
73 return pidfile_timeout_mins * 60
74
75
mbligh83c1e9e2009-05-01 23:10:41 +000076def _site_init_monitor_db_dummy():
77 return {}
78
79
mbligh36768f02008-02-22 18:28:33 +000080def main():
showard27f33872009-04-07 18:20:53 +000081 try:
showard549afad2009-08-20 23:33:36 +000082 try:
83 main_without_exception_handling()
84 except SystemExit:
85 raise
86 except:
87 logging.exception('Exception escaping in monitor_db')
88 raise
89 finally:
90 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000091
92
93def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000094 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000095
showard136e6dc2009-06-10 19:38:49 +000096 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000097 parser = optparse.OptionParser(usage)
98 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
99 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000100 parser.add_option('--test', help='Indicate that scheduler is under ' +
101 'test and should use dummy autoserv and no parsing',
102 action='store_true')
103 (options, args) = parser.parse_args()
104 if len(args) != 1:
105 parser.print_usage()
106 return
mbligh36768f02008-02-22 18:28:33 +0000107
showard5613c662009-06-08 23:30:33 +0000108 scheduler_enabled = global_config.global_config.get_config_value(
109 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
110
111 if not scheduler_enabled:
112 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
113 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000114 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000115 sys.exit(1)
116
jadmanski0afbb632008-06-06 21:10:57 +0000117 global RESULTS_DIR
118 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000119
mbligh83c1e9e2009-05-01 23:10:41 +0000120 site_init = utils.import_site_function(__file__,
121 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
122 _site_init_monitor_db_dummy)
123 site_init()
124
showardcca334f2009-03-12 20:38:34 +0000125 # Change the cwd while running to avoid issues incase we were launched from
126 # somewhere odd (such as a random NFS home directory of the person running
127 # sudo to launch us as the appropriate user).
128 os.chdir(RESULTS_DIR)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000131 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
132 "notify_email_statuses",
133 default='')
showardc85c21b2008-11-24 22:17:37 +0000134 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000135 _notify_email_statuses = [status for status in
136 re.split(r'[\s,;:]', notify_statuses_list.lower())
137 if status]
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
mbligh37eceaa2008-12-15 22:56:37 +0000145 # AUTOTEST_WEB.base_url is still a supported config option as some people
146 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000147 global _base_url
showard170873e2009-01-07 00:22:26 +0000148 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
149 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000150 if config_base_url:
151 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000152 else:
mbligh37eceaa2008-12-15 22:56:37 +0000153 # For the common case of everything running on a single server you
154 # can just set the hostname in a single place in the config file.
155 server_name = c.get_config_value('SERVER', 'hostname')
156 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000157 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000158 sys.exit(1)
159 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000160
showardc5afc462009-01-13 00:09:39 +0000161 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000162 server.start()
163
jadmanski0afbb632008-06-06 21:10:57 +0000164 try:
showard136e6dc2009-06-10 19:38:49 +0000165 init()
showardc5afc462009-01-13 00:09:39 +0000166 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000167 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 while not _shutdown:
170 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000171 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000172 except:
showard170873e2009-01-07 00:22:26 +0000173 email_manager.manager.log_stacktrace(
174 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000175
showard170873e2009-01-07 00:22:26 +0000176 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000177 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000178 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000179 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000180
181
showard136e6dc2009-06-10 19:38:49 +0000182def setup_logging():
183 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
184 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
185 logging_manager.configure_logging(
186 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
187 logfile_name=log_name)
188
189
mbligh36768f02008-02-22 18:28:33 +0000190def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000191 global _shutdown
192 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000193 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000194
195
showard136e6dc2009-06-10 19:38:49 +0000196def init():
showardb18134f2009-03-20 20:52:18 +0000197 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
198 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000199
showard8de37132009-08-31 18:33:08 +0000200 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000201 logging.critical("monitor_db already running, aborting!")
202 sys.exit(1)
203 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000204
showardb1e51872008-10-07 11:08:18 +0000205 if _testing_mode:
206 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000207 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
210 global _db
showard170873e2009-01-07 00:22:26 +0000211 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000212 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000213
showardfa8629c2008-11-04 16:51:23 +0000214 # ensure Django connection is in autocommit
215 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000216 # bypass the readonly connection
217 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000220 signal.signal(signal.SIGINT, handle_sigint)
221
showardd1ee1dd2009-01-07 21:33:08 +0000222 drones = global_config.global_config.get_config_value(
223 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
224 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000225 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000226 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000227 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
228
showardb18134f2009-03-20 20:52:18 +0000229 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000230
231
showarded2afea2009-07-07 20:54:07 +0000232def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
233 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000234 """
235 @returns The autoserv command line as a list of executable + parameters.
236
237 @param machines - string - A machine or comma separated list of machines
238 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000239 @param extra_args - list - Additional arguments to pass to autoserv.
240 @param job - Job object - If supplied, -u owner and -l name parameters
241 will be added.
242 @param queue_entry - A HostQueueEntry object - If supplied and no Job
243 object was supplied, this will be used to lookup the Job object.
244 """
showarda9545c02009-12-18 22:44:26 +0000245 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000246 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000247 if machines:
248 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000249 if job or queue_entry:
250 if not job:
251 job = queue_entry.job
252 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000253 if verbose:
254 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000255 return autoserv_argv + extra_args
256
257
showard89f84db2009-03-12 20:39:13 +0000258class SchedulerError(Exception):
259 """Raised by HostScheduler when an inconsistent state occurs."""
260
261
showard63a34772008-08-18 19:32:50 +0000262class HostScheduler(object):
263 def _get_ready_hosts(self):
264 # avoid any host with a currently active queue entry against it
265 hosts = Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000266 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
267 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000268 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000269 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000270 "AND NOT afe_hosts.locked "
271 "AND (afe_hosts.status IS NULL "
272 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000273 return dict((host.id, host) for host in hosts)
274
275
276 @staticmethod
277 def _get_sql_id_list(id_list):
278 return ','.join(str(item_id) for item_id in id_list)
279
280
281 @classmethod
showard989f25d2008-10-01 11:38:11 +0000282 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000283 if not id_list:
284 return {}
showard63a34772008-08-18 19:32:50 +0000285 query %= cls._get_sql_id_list(id_list)
286 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000287 return cls._process_many2many_dict(rows, flip)
288
289
290 @staticmethod
291 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000292 result = {}
293 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000294 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000295 if flip:
296 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000297 result.setdefault(left_id, set()).add(right_id)
298 return result
299
300
301 @classmethod
302 def _get_job_acl_groups(cls, job_ids):
303 query = """
showardeab66ce2009-12-23 00:03:56 +0000304 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
305 FROM afe_jobs
306 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
307 INNER JOIN afe_acl_groups_users ON
308 afe_acl_groups_users.user_id = afe_users.id
309 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
315 def _get_job_ineligible_hosts(cls, job_ids):
316 query = """
317 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000318 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard989f25d2008-10-01 11:38:11 +0000325 def _get_job_dependencies(cls, job_ids):
326 query = """
327 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000328 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard63a34772008-08-18 19:32:50 +0000335 def _get_host_acls(cls, host_ids):
336 query = """
showardd9ac4452009-02-07 02:04:37 +0000337 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000338 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000339 WHERE host_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, host_ids)
342
343
344 @classmethod
345 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000346 if not host_ids:
347 return {}, {}
showard63a34772008-08-18 19:32:50 +0000348 query = """
349 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000350 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000351 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000352 """ % cls._get_sql_id_list(host_ids)
353 rows = _db.execute(query)
354 labels_to_hosts = cls._process_many2many_dict(rows)
355 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
356 return labels_to_hosts, hosts_to_labels
357
358
359 @classmethod
360 def _get_labels(cls):
361 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000362
363
364 def refresh(self, pending_queue_entries):
365 self._hosts_available = self._get_ready_hosts()
366
367 relevant_jobs = [queue_entry.job_id
368 for queue_entry in pending_queue_entries]
369 self._job_acls = self._get_job_acl_groups(relevant_jobs)
370 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000371 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000372
373 host_ids = self._hosts_available.keys()
374 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000375 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
376
377 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def _is_acl_accessible(self, host_id, queue_entry):
381 job_acls = self._job_acls.get(queue_entry.job_id, set())
382 host_acls = self._host_acls.get(host_id, set())
383 return len(host_acls.intersection(job_acls)) > 0
384
385
showard989f25d2008-10-01 11:38:11 +0000386 def _check_job_dependencies(self, job_dependencies, host_labels):
387 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000388 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000389
390
391 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
392 queue_entry):
showardade14e22009-01-26 22:38:32 +0000393 if not queue_entry.meta_host:
394 # bypass only_if_needed labels when a specific host is selected
395 return True
396
showard989f25d2008-10-01 11:38:11 +0000397 for label_id in host_labels:
398 label = self._labels[label_id]
399 if not label.only_if_needed:
400 # we don't care about non-only_if_needed labels
401 continue
402 if queue_entry.meta_host == label_id:
403 # if the label was requested in a metahost it's OK
404 continue
405 if label_id not in job_dependencies:
406 return False
407 return True
408
409
showard89f84db2009-03-12 20:39:13 +0000410 def _check_atomic_group_labels(self, host_labels, queue_entry):
411 """
412 Determine if the given HostQueueEntry's atomic group settings are okay
413 to schedule on a host with the given labels.
414
showard6157c632009-07-06 20:19:31 +0000415 @param host_labels: A list of label ids that the host has.
416 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000417
418 @returns True if atomic group settings are okay, False otherwise.
419 """
showard6157c632009-07-06 20:19:31 +0000420 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000421 queue_entry.atomic_group_id)
422
423
showard6157c632009-07-06 20:19:31 +0000424 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000425 """
426 Return the atomic group label id for a host with the given set of
427 labels if any, or None otherwise. Raises an exception if more than
428 one atomic group are found in the set of labels.
429
showard6157c632009-07-06 20:19:31 +0000430 @param host_labels: A list of label ids that the host has.
431 @param queue_entry: The HostQueueEntry we're testing. Only used for
432 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000433
434 @returns The id of the atomic group found on a label in host_labels
435 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000436 """
showard6157c632009-07-06 20:19:31 +0000437 atomic_labels = [self._labels[label_id] for label_id in host_labels
438 if self._labels[label_id].atomic_group_id is not None]
439 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000440 if not atomic_ids:
441 return None
442 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000443 logging.error('More than one Atomic Group on HQE "%s" via: %r',
444 queue_entry, atomic_labels)
445 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000446
447
448 def _get_atomic_group_labels(self, atomic_group_id):
449 """
450 Lookup the label ids that an atomic_group is associated with.
451
452 @param atomic_group_id - The id of the AtomicGroup to look up.
453
454 @returns A generator yeilding Label ids for this atomic group.
455 """
456 return (id for id, label in self._labels.iteritems()
457 if label.atomic_group_id == atomic_group_id
458 and not label.invalid)
459
460
showard54c1ea92009-05-20 00:32:58 +0000461 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000462 """
463 @param group_hosts - A sequence of Host ids to test for usability
464 and eligibility against the Job associated with queue_entry.
465 @param queue_entry - The HostQueueEntry that these hosts are being
466 tested for eligibility against.
467
468 @returns A subset of group_hosts Host ids that are eligible for the
469 supplied queue_entry.
470 """
471 return set(host_id for host_id in group_hosts
472 if self._is_host_usable(host_id)
473 and self._is_host_eligible_for_job(host_id, queue_entry))
474
475
showard989f25d2008-10-01 11:38:11 +0000476 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000477 if self._is_host_invalid(host_id):
478 # if an invalid host is scheduled for a job, it's a one-time host
479 # and it therefore bypasses eligibility checks. note this can only
480 # happen for non-metahosts, because invalid hosts have their label
481 # relationships cleared.
482 return True
483
showard989f25d2008-10-01 11:38:11 +0000484 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
485 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000486
showard89f84db2009-03-12 20:39:13 +0000487 return (self._is_acl_accessible(host_id, queue_entry) and
488 self._check_job_dependencies(job_dependencies, host_labels) and
489 self._check_only_if_needed_labels(
490 job_dependencies, host_labels, queue_entry) and
491 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000492
493
showard2924b0a2009-06-18 23:16:15 +0000494 def _is_host_invalid(self, host_id):
495 host_object = self._hosts_available.get(host_id, None)
496 return host_object and host_object.invalid
497
498
showard63a34772008-08-18 19:32:50 +0000499 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000500 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000501 return None
502 return self._hosts_available.pop(queue_entry.host_id, None)
503
504
505 def _is_host_usable(self, host_id):
506 if host_id not in self._hosts_available:
507 # host was already used during this scheduling cycle
508 return False
509 if self._hosts_available[host_id].invalid:
510 # Invalid hosts cannot be used for metahosts. They're included in
511 # the original query because they can be used by non-metahosts.
512 return False
513 return True
514
515
516 def _schedule_metahost(self, queue_entry):
517 label_id = queue_entry.meta_host
518 hosts_in_label = self._label_hosts.get(label_id, set())
519 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
520 set())
521
522 # must iterate over a copy so we can mutate the original while iterating
523 for host_id in list(hosts_in_label):
524 if not self._is_host_usable(host_id):
525 hosts_in_label.remove(host_id)
526 continue
527 if host_id in ineligible_host_ids:
528 continue
showard989f25d2008-10-01 11:38:11 +0000529 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000530 continue
531
showard89f84db2009-03-12 20:39:13 +0000532 # Remove the host from our cached internal state before returning
533 # the host object.
showard63a34772008-08-18 19:32:50 +0000534 hosts_in_label.remove(host_id)
535 return self._hosts_available.pop(host_id)
536 return None
537
538
539 def find_eligible_host(self, queue_entry):
540 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000541 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000542 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000543 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000544 return self._schedule_metahost(queue_entry)
545
546
showard89f84db2009-03-12 20:39:13 +0000547 def find_eligible_atomic_group(self, queue_entry):
548 """
549 Given an atomic group host queue entry, locate an appropriate group
550 of hosts for the associated job to run on.
551
552 The caller is responsible for creating new HQEs for the additional
553 hosts returned in order to run the actual job on them.
554
555 @returns A list of Host instances in a ready state to satisfy this
556 atomic group scheduling. Hosts will all belong to the same
557 atomic group label as specified by the queue_entry.
558 An empty list will be returned if no suitable atomic
559 group could be found.
560
561 TODO(gps): what is responsible for kicking off any attempted repairs on
562 a group of hosts? not this function, but something needs to. We do
563 not communicate that reason for returning [] outside of here...
564 For now, we'll just be unschedulable if enough hosts within one group
565 enter Repair Failed state.
566 """
567 assert queue_entry.atomic_group_id is not None
568 job = queue_entry.job
569 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000570 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000571 if job.synch_count > atomic_group.max_number_of_machines:
572 # Such a Job and HostQueueEntry should never be possible to
573 # create using the frontend. Regardless, we can't process it.
574 # Abort it immediately and log an error on the scheduler.
575 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000576 logging.error(
577 'Error: job %d synch_count=%d > requested atomic_group %d '
578 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
579 job.id, job.synch_count, atomic_group.id,
580 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000581 return []
582 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
583 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
584 set())
585
586 # Look in each label associated with atomic_group until we find one with
587 # enough hosts to satisfy the job.
588 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
589 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
590 if queue_entry.meta_host is not None:
591 # If we have a metahost label, only allow its hosts.
592 group_hosts.intersection_update(hosts_in_label)
593 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000595 group_hosts, queue_entry)
596
597 # Job.synch_count is treated as "minimum synch count" when
598 # scheduling for an atomic group of hosts. The atomic group
599 # number of machines is the maximum to pick out of a single
600 # atomic group label for scheduling at one time.
601 min_hosts = job.synch_count
602 max_hosts = atomic_group.max_number_of_machines
603
showard54c1ea92009-05-20 00:32:58 +0000604 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000605 # Not enough eligible hosts in this atomic group label.
606 continue
607
showard54c1ea92009-05-20 00:32:58 +0000608 eligible_hosts_in_group = [self._hosts_available[id]
609 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000610 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000611 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000612
showard89f84db2009-03-12 20:39:13 +0000613 # Limit ourselves to scheduling the atomic group size.
614 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000615 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000616
617 # Remove the selected hosts from our cached internal state
618 # of available hosts in order to return the Host objects.
619 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000620 for host in eligible_hosts_in_group:
621 hosts_in_label.discard(host.id)
622 self._hosts_available.pop(host.id)
623 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000624 return host_list
625
626 return []
627
628
showard170873e2009-01-07 00:22:26 +0000629class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000630 def __init__(self):
631 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000632 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000633 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000634 user_cleanup_time = scheduler_config.config.clean_interval
635 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
636 _db, user_cleanup_time)
637 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000638 self._host_agents = {}
639 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000640 self._tick_count = 0
641 self._last_garbage_stats_time = time.time()
642 self._seconds_between_garbage_stats = 60 * (
643 global_config.global_config.get_config_value(
644 scheduler_config.CONFIG_SECTION,
645 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard915958d2009-04-22 21:00:58 +0000648 def initialize(self, recover_hosts=True):
649 self._periodic_cleanup.initialize()
650 self._24hr_upkeep.initialize()
651
jadmanski0afbb632008-06-06 21:10:57 +0000652 # always recover processes
653 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000654
jadmanski0afbb632008-06-06 21:10:57 +0000655 if recover_hosts:
656 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000660 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000661 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000662 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000663 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000664 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000665 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000666 self._schedule_running_host_queue_entries()
667 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000668 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000669 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000670 _drone_manager.execute_actions()
671 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000672 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000673 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000674
showard97aed502008-11-04 02:01:24 +0000675
mblighf3294cc2009-04-08 21:17:38 +0000676 def _run_cleanup(self):
677 self._periodic_cleanup.run_cleanup_maybe()
678 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000679
mbligh36768f02008-02-22 18:28:33 +0000680
showardf13a9e22009-12-18 22:54:09 +0000681 def _garbage_collection(self):
682 threshold_time = time.time() - self._seconds_between_garbage_stats
683 if threshold_time < self._last_garbage_stats_time:
684 # Don't generate these reports very often.
685 return
686
687 self._last_garbage_stats_time = time.time()
688 # Force a full level 0 collection (because we can, it doesn't hurt
689 # at this interval).
690 gc.collect()
691 logging.info('Logging garbage collector stats on tick %d.',
692 self._tick_count)
693 gc_stats._log_garbage_collector_stats()
694
695
showard170873e2009-01-07 00:22:26 +0000696 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
697 for object_id in object_ids:
698 agent_dict.setdefault(object_id, set()).add(agent)
699
700
701 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
702 for object_id in object_ids:
703 assert object_id in agent_dict
704 agent_dict[object_id].remove(agent)
705
706
showardd1195652009-12-08 22:21:02 +0000707 def add_agent_task(self, agent_task):
708 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000709 self._agents.append(agent)
710 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000711 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
712 self._register_agent_for_ids(self._queue_entry_agents,
713 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000714
showard170873e2009-01-07 00:22:26 +0000715
716 def get_agents_for_entry(self, queue_entry):
717 """
718 Find agents corresponding to the specified queue_entry.
719 """
showardd3dc1992009-04-22 21:01:40 +0000720 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000721
722
723 def host_has_agent(self, host):
724 """
725 Determine if there is currently an Agent present using this host.
726 """
727 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000728
729
jadmanski0afbb632008-06-06 21:10:57 +0000730 def remove_agent(self, agent):
731 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000732 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
733 agent)
734 self._unregister_agent_for_ids(self._queue_entry_agents,
735 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000736
737
showard8cc058f2009-09-08 16:26:33 +0000738 def _host_has_scheduled_special_task(self, host):
739 return bool(models.SpecialTask.objects.filter(host__id=host.id,
740 is_active=False,
741 is_complete=False))
742
743
jadmanski0afbb632008-06-06 21:10:57 +0000744 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000745 agent_tasks = self._create_recovery_agent_tasks()
746 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000747 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000748 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000749 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000750 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000751 self._reverify_remaining_hosts()
752 # reinitialize drones after killing orphaned processes, since they can
753 # leave around files when they die
754 _drone_manager.execute_actions()
755 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000756
showard170873e2009-01-07 00:22:26 +0000757
showardd1195652009-12-08 22:21:02 +0000758 def _create_recovery_agent_tasks(self):
759 return (self._get_queue_entry_agent_tasks()
760 + self._get_special_task_agent_tasks(is_active=True))
761
762
763 def _get_queue_entry_agent_tasks(self):
764 # host queue entry statuses handled directly by AgentTasks (Verifying is
765 # handled through SpecialTasks, so is not listed here)
766 statuses = (models.HostQueueEntry.Status.STARTING,
767 models.HostQueueEntry.Status.RUNNING,
768 models.HostQueueEntry.Status.GATHERING,
769 models.HostQueueEntry.Status.PARSING)
770 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000771 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000772 where='status IN (%s)' % status_list)
773
774 agent_tasks = []
775 used_queue_entries = set()
776 for entry in queue_entries:
777 if self.get_agents_for_entry(entry):
778 # already being handled
779 continue
780 if entry in used_queue_entries:
781 # already picked up by a synchronous job
782 continue
783 agent_task = self._get_agent_task_for_queue_entry(entry)
784 agent_tasks.append(agent_task)
785 used_queue_entries.update(agent_task.queue_entries)
786 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000787
788
showardd1195652009-12-08 22:21:02 +0000789 def _get_special_task_agent_tasks(self, is_active=False):
790 special_tasks = models.SpecialTask.objects.filter(
791 is_active=is_active, is_complete=False)
792 return [self._get_agent_task_for_special_task(task)
793 for task in special_tasks]
794
795
796 def _get_agent_task_for_queue_entry(self, queue_entry):
797 """
798 Construct an AgentTask instance for the given active HostQueueEntry,
799 if one can currently run it.
800 @param queue_entry: a HostQueueEntry
801 @returns an AgentTask to run the queue entry
802 """
803 task_entries = queue_entry.job.get_group_entries(queue_entry)
804 self._check_for_duplicate_host_entries(task_entries)
805
806 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
807 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000808 if queue_entry.is_hostless():
809 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000810 return QueueTask(queue_entries=task_entries)
811 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
812 return GatherLogsTask(queue_entries=task_entries)
813 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
814 return FinalReparseTask(queue_entries=task_entries)
815
816 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
817 'invalid status %s: %s' % (entry.status, entry))
818
819
820 def _check_for_duplicate_host_entries(self, task_entries):
showarda9545c02009-12-18 22:44:26 +0000821 parsing_status = models.HostQueueEntry.Status.PARSING
showardd1195652009-12-08 22:21:02 +0000822 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000823 using_host = (task_entry.host is not None
824 and task_entry.status != parsing_status)
825 if using_host:
showardd1195652009-12-08 22:21:02 +0000826 self._assert_host_has_no_agent(task_entry)
827
828
829 def _assert_host_has_no_agent(self, entry):
830 """
831 @param entry: a HostQueueEntry or a SpecialTask
832 """
833 if self.host_has_agent(entry.host):
834 agent = tuple(self._host_agents.get(entry.host.id))[0]
835 raise SchedulerError(
836 'While scheduling %s, host %s already has a host agent %s'
837 % (entry, entry.host, agent.task))
838
839
840 def _get_agent_task_for_special_task(self, special_task):
841 """
842 Construct an AgentTask class to run the given SpecialTask and add it
843 to this dispatcher.
844 @param special_task: a models.SpecialTask instance
845 @returns an AgentTask to run this SpecialTask
846 """
847 self._assert_host_has_no_agent(special_task)
848
849 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
850 for agent_task_class in special_agent_task_classes:
851 if agent_task_class.TASK_TYPE == special_task.task:
852 return agent_task_class(task=special_task)
853
854 raise SchedulerError('No AgentTask class for task', str(special_task))
855
856
857 def _register_pidfiles(self, agent_tasks):
858 for agent_task in agent_tasks:
859 agent_task.register_necessary_pidfiles()
860
861
862 def _recover_tasks(self, agent_tasks):
863 orphans = _drone_manager.get_orphaned_autoserv_processes()
864
865 for agent_task in agent_tasks:
866 agent_task.recover()
867 if agent_task.monitor and agent_task.monitor.has_process():
868 orphans.discard(agent_task.monitor.get_process())
869 self.add_agent_task(agent_task)
870
871 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000872
873
showard8cc058f2009-09-08 16:26:33 +0000874 def _get_unassigned_entries(self, status):
875 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000876 if entry.status == status and not self.get_agents_for_entry(entry):
877 # The status can change during iteration, e.g., if job.run()
878 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000879 yield entry
880
881
showard6878e8b2009-07-20 22:37:45 +0000882 def _check_for_remaining_orphan_processes(self, orphans):
883 if not orphans:
884 return
885 subject = 'Unrecovered orphan autoserv processes remain'
886 message = '\n'.join(str(process) for process in orphans)
887 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000888
889 die_on_orphans = global_config.global_config.get_config_value(
890 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
891
892 if die_on_orphans:
893 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000894
showard170873e2009-01-07 00:22:26 +0000895
showard8cc058f2009-09-08 16:26:33 +0000896 def _recover_pending_entries(self):
897 for entry in self._get_unassigned_entries(
898 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000899 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000900 entry.on_pending()
901
902
showardb8900452009-10-12 20:31:01 +0000903 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000904 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000905 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
906 unrecovered_hqes = []
907 for queue_entry in queue_entries:
908 special_tasks = models.SpecialTask.objects.filter(
909 task__in=(models.SpecialTask.Task.CLEANUP,
910 models.SpecialTask.Task.VERIFY),
911 queue_entry__id=queue_entry.id,
912 is_complete=False)
913 if special_tasks.count() == 0:
914 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000915
showardb8900452009-10-12 20:31:01 +0000916 if unrecovered_hqes:
917 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000918 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000919 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000920 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000921
922
showard65db3932009-10-28 19:54:35 +0000923 def _get_prioritized_special_tasks(self):
924 """
925 Returns all queued SpecialTasks prioritized for repair first, then
926 cleanup, then verify.
927 """
928 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
929 is_complete=False,
930 host__locked=False)
931 # exclude hosts with active queue entries unless the SpecialTask is for
932 # that queue entry
933 queued_tasks = models.Host.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000934 queued_tasks, 'afe_host_queue_entries', 'host_id',
935 join_condition='afe_host_queue_entries.active',
showard65db3932009-10-28 19:54:35 +0000936 force_left_join=True)
937 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000938 where=['(afe_host_queue_entries.id IS NULL OR '
939 'afe_host_queue_entries.id = '
940 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000941
showard65db3932009-10-28 19:54:35 +0000942 # reorder tasks by priority
943 task_priority_order = [models.SpecialTask.Task.REPAIR,
944 models.SpecialTask.Task.CLEANUP,
945 models.SpecialTask.Task.VERIFY]
946 def task_priority_key(task):
947 return task_priority_order.index(task.task)
948 return sorted(queued_tasks, key=task_priority_key)
949
950
showard65db3932009-10-28 19:54:35 +0000951 def _schedule_special_tasks(self):
952 """
953 Execute queued SpecialTasks that are ready to run on idle hosts.
954 """
955 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000956 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000957 continue
showardd1195652009-12-08 22:21:02 +0000958 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000962 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000963 # should never happen
showarded2afea2009-07-07 20:54:07 +0000964 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000965 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000966 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000967 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000968 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000969
970
jadmanski0afbb632008-06-06 21:10:57 +0000971 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000972 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000973 full_where='locked = 0 AND invalid = 0 AND ' + where
974 for host in Host.fetch(where=full_where):
975 if self.host_has_agent(host):
976 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000977 continue
showard8cc058f2009-09-08 16:26:33 +0000978 if self._host_has_scheduled_special_task(host):
979 # host will have a special task scheduled on the next cycle
980 continue
showard170873e2009-01-07 00:22:26 +0000981 if print_message:
showardb18134f2009-03-20 20:52:18 +0000982 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000983 models.SpecialTask.objects.create(
984 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000985 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000986
987
jadmanski0afbb632008-06-06 21:10:57 +0000988 def _recover_hosts(self):
989 # recover "Repair Failed" hosts
990 message = 'Reverifying dead host %s'
991 self._reverify_hosts_where("status = 'Repair Failed'",
992 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000993
994
showard04c82c52008-05-29 19:38:12 +0000995
showardb95b1bd2008-08-15 18:11:04 +0000996 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000997 # prioritize by job priority, then non-metahost over metahost, then FIFO
998 return list(HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000999 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001000 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001001 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001002
1003
showard89f84db2009-03-12 20:39:13 +00001004 def _refresh_pending_queue_entries(self):
1005 """
1006 Lookup the pending HostQueueEntries and call our HostScheduler
1007 refresh() method given that list. Return the list.
1008
1009 @returns A list of pending HostQueueEntries sorted in priority order.
1010 """
showard63a34772008-08-18 19:32:50 +00001011 queue_entries = self._get_pending_queue_entries()
1012 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001013 return []
showardb95b1bd2008-08-15 18:11:04 +00001014
showard63a34772008-08-18 19:32:50 +00001015 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001016
showard89f84db2009-03-12 20:39:13 +00001017 return queue_entries
1018
1019
1020 def _schedule_atomic_group(self, queue_entry):
1021 """
1022 Schedule the given queue_entry on an atomic group of hosts.
1023
1024 Returns immediately if there are insufficient available hosts.
1025
1026 Creates new HostQueueEntries based off of queue_entry for the
1027 scheduled hosts and starts them all running.
1028 """
1029 # This is a virtual host queue entry representing an entire
1030 # atomic group, find a group and schedule their hosts.
1031 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1032 queue_entry)
1033 if not group_hosts:
1034 return
showardcbe6f942009-06-17 19:33:49 +00001035
1036 logging.info('Expanding atomic group entry %s with hosts %s',
1037 queue_entry,
1038 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001039 # The first assigned host uses the original HostQueueEntry
1040 group_queue_entries = [queue_entry]
1041 for assigned_host in group_hosts[1:]:
1042 # Create a new HQE for every additional assigned_host.
1043 new_hqe = HostQueueEntry.clone(queue_entry)
1044 new_hqe.save()
1045 group_queue_entries.append(new_hqe)
1046 assert len(group_queue_entries) == len(group_hosts)
1047 for queue_entry, host in itertools.izip(group_queue_entries,
1048 group_hosts):
1049 self._run_queue_entry(queue_entry, host)
1050
1051
showarda9545c02009-12-18 22:44:26 +00001052 def _schedule_hostless_job(self, queue_entry):
1053 self.add_agent_task(HostlessQueueTask(queue_entry))
1054
1055
showard89f84db2009-03-12 20:39:13 +00001056 def _schedule_new_jobs(self):
1057 queue_entries = self._refresh_pending_queue_entries()
1058 if not queue_entries:
1059 return
1060
showard63a34772008-08-18 19:32:50 +00001061 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001062 is_unassigned_atomic_group = (
1063 queue_entry.atomic_group_id is not None
1064 and queue_entry.host_id is None)
1065 if is_unassigned_atomic_group:
1066 self._schedule_atomic_group(queue_entry)
showarda9545c02009-12-18 22:44:26 +00001067 elif queue_entry.is_hostless():
1068 self._schedule_hostless_job(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001069 else:
showard89f84db2009-03-12 20:39:13 +00001070 assigned_host = self._host_scheduler.find_eligible_host(
1071 queue_entry)
showard65db3932009-10-28 19:54:35 +00001072 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001073 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001074
1075
showard8cc058f2009-09-08 16:26:33 +00001076 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001077 for agent_task in self._get_queue_entry_agent_tasks():
1078 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001079
1080
1081 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001082 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1083 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001084 task = entry.job.schedule_delayed_callback_task(entry)
1085 if task:
showardd1195652009-12-08 22:21:02 +00001086 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001087
1088
showardb95b1bd2008-08-15 18:11:04 +00001089 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001090 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001091
1092
jadmanski0afbb632008-06-06 21:10:57 +00001093 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001094 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001095 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001096 for agent in self.get_agents_for_entry(entry):
1097 agent.abort()
1098 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001099
1100
showard324bf812009-01-20 23:23:38 +00001101 def _can_start_agent(self, agent, num_started_this_cycle,
1102 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001103 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001104 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001105 return True
1106 # don't allow any nonzero-process agents to run after we've reached a
1107 # limit (this avoids starvation of many-process agents)
1108 if have_reached_limit:
1109 return False
1110 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001111 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001112 agent.task.owner_username)
1113 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001114 return False
1115 # if a single agent exceeds the per-cycle throttling, still allow it to
1116 # run when it's the first agent in the cycle
1117 if num_started_this_cycle == 0:
1118 return True
1119 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001120 if (num_started_this_cycle + agent.task.num_processes >
1121 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001122 return False
1123 return True
1124
1125
jadmanski0afbb632008-06-06 21:10:57 +00001126 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001127 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001128 have_reached_limit = False
1129 # iterate over copy, so we can remove agents during iteration
1130 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001131 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001132 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001133 have_reached_limit):
1134 have_reached_limit = True
1135 continue
showardd1195652009-12-08 22:21:02 +00001136 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001137 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001138 if agent.is_done():
1139 logging.info("agent finished")
1140 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001141 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001142 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001143
1144
showard29f7cd22009-04-29 21:16:24 +00001145 def _process_recurring_runs(self):
1146 recurring_runs = models.RecurringRun.objects.filter(
1147 start_date__lte=datetime.datetime.now())
1148 for rrun in recurring_runs:
1149 # Create job from template
1150 job = rrun.job
1151 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001152 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001153
1154 host_objects = info['hosts']
1155 one_time_hosts = info['one_time_hosts']
1156 metahost_objects = info['meta_hosts']
1157 dependencies = info['dependencies']
1158 atomic_group = info['atomic_group']
1159
1160 for host in one_time_hosts or []:
1161 this_host = models.Host.create_one_time_host(host.hostname)
1162 host_objects.append(this_host)
1163
1164 try:
1165 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001166 options=options,
showard29f7cd22009-04-29 21:16:24 +00001167 host_objects=host_objects,
1168 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001169 atomic_group=atomic_group)
1170
1171 except Exception, ex:
1172 logging.exception(ex)
1173 #TODO send email
1174
1175 if rrun.loop_count == 1:
1176 rrun.delete()
1177 else:
1178 if rrun.loop_count != 0: # if not infinite loop
1179 # calculate new start_date
1180 difference = datetime.timedelta(seconds=rrun.loop_period)
1181 rrun.start_date = rrun.start_date + difference
1182 rrun.loop_count -= 1
1183 rrun.save()
1184
1185
showard170873e2009-01-07 00:22:26 +00001186class PidfileRunMonitor(object):
1187 """
1188 Client must call either run() to start a new process or
1189 attach_to_existing_process().
1190 """
mbligh36768f02008-02-22 18:28:33 +00001191
showard170873e2009-01-07 00:22:26 +00001192 class _PidfileException(Exception):
1193 """
1194 Raised when there's some unexpected behavior with the pid file, but only
1195 used internally (never allowed to escape this class).
1196 """
mbligh36768f02008-02-22 18:28:33 +00001197
1198
showard170873e2009-01-07 00:22:26 +00001199 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001200 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001201 self._start_time = None
1202 self.pidfile_id = None
1203 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001204
1205
showard170873e2009-01-07 00:22:26 +00001206 def _add_nice_command(self, command, nice_level):
1207 if not nice_level:
1208 return command
1209 return ['nice', '-n', str(nice_level)] + command
1210
1211
1212 def _set_start_time(self):
1213 self._start_time = time.time()
1214
1215
showard418785b2009-11-23 20:19:59 +00001216 def run(self, command, working_directory, num_processes, nice_level=None,
1217 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1218 username=None):
showard170873e2009-01-07 00:22:26 +00001219 assert command is not None
1220 if nice_level is not None:
1221 command = ['nice', '-n', str(nice_level)] + command
1222 self._set_start_time()
1223 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001224 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001225 num_processes=num_processes, log_file=log_file,
1226 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001227
1228
showarded2afea2009-07-07 20:54:07 +00001229 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001230 pidfile_name=_AUTOSERV_PID_FILE,
1231 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001232 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001233 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001234 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001235 if num_processes is not None:
1236 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def kill(self):
showard170873e2009-01-07 00:22:26 +00001240 if self.has_process():
1241 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001242
mbligh36768f02008-02-22 18:28:33 +00001243
showard170873e2009-01-07 00:22:26 +00001244 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001245 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001246 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001247
1248
showard170873e2009-01-07 00:22:26 +00001249 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001250 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001251 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001252 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001253
1254
showard170873e2009-01-07 00:22:26 +00001255 def _read_pidfile(self, use_second_read=False):
1256 assert self.pidfile_id is not None, (
1257 'You must call run() or attach_to_existing_process()')
1258 contents = _drone_manager.get_pidfile_contents(
1259 self.pidfile_id, use_second_read=use_second_read)
1260 if contents.is_invalid():
1261 self._state = drone_manager.PidfileContents()
1262 raise self._PidfileException(contents)
1263 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001264
1265
showard21baa452008-10-21 00:08:39 +00001266 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001267 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1268 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001269 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001270 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001271
1272
1273 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001274 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001275 return
mblighbb421852008-03-11 22:36:16 +00001276
showard21baa452008-10-21 00:08:39 +00001277 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001278
showard170873e2009-01-07 00:22:26 +00001279 if self._state.process is None:
1280 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001281 return
mbligh90a549d2008-03-25 23:52:34 +00001282
showard21baa452008-10-21 00:08:39 +00001283 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001284 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001285 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001286 return
mbligh90a549d2008-03-25 23:52:34 +00001287
showard170873e2009-01-07 00:22:26 +00001288 # pid but no running process - maybe process *just* exited
1289 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001290 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001291 # autoserv exited without writing an exit code
1292 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001293 self._handle_pidfile_error(
1294 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001295
showard21baa452008-10-21 00:08:39 +00001296
1297 def _get_pidfile_info(self):
1298 """\
1299 After completion, self._state will contain:
1300 pid=None, exit_status=None if autoserv has not yet run
1301 pid!=None, exit_status=None if autoserv is running
1302 pid!=None, exit_status!=None if autoserv has completed
1303 """
1304 try:
1305 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001306 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001307 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001308
1309
showard170873e2009-01-07 00:22:26 +00001310 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001311 """\
1312 Called when no pidfile is found or no pid is in the pidfile.
1313 """
showard170873e2009-01-07 00:22:26 +00001314 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001315 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001316 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001317 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001318 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001319
1320
showard35162b02009-03-03 02:17:30 +00001321 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001322 """\
1323 Called when autoserv has exited without writing an exit status,
1324 or we've timed out waiting for autoserv to write a pid to the
1325 pidfile. In either case, we just return failure and the caller
1326 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001327
showard170873e2009-01-07 00:22:26 +00001328 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001329 """
1330 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001331 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001332 self._state.exit_status = 1
1333 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001334
1335
jadmanski0afbb632008-06-06 21:10:57 +00001336 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001337 self._get_pidfile_info()
1338 return self._state.exit_status
1339
1340
1341 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001342 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001343 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001344 if self._state.num_tests_failed is None:
1345 return -1
showard21baa452008-10-21 00:08:39 +00001346 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001347
1348
showardcdaeae82009-08-31 18:32:48 +00001349 def try_copy_results_on_drone(self, **kwargs):
1350 if self.has_process():
1351 # copy results logs into the normal place for job results
1352 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1353
1354
1355 def try_copy_to_results_repository(self, source, **kwargs):
1356 if self.has_process():
1357 _drone_manager.copy_to_results_repository(self.get_process(),
1358 source, **kwargs)
1359
1360
mbligh36768f02008-02-22 18:28:33 +00001361class Agent(object):
showard77182562009-06-10 00:16:05 +00001362 """
showard8cc058f2009-09-08 16:26:33 +00001363 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001364
1365 The following methods are required on all task objects:
1366 poll() - Called periodically to let the task check its status and
1367 update its internal state. If the task succeeded.
1368 is_done() - Returns True if the task is finished.
1369 abort() - Called when an abort has been requested. The task must
1370 set its aborted attribute to True if it actually aborted.
1371
1372 The following attributes are required on all task objects:
1373 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001374 success - bool, True if this task succeeded.
1375 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1376 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001377 """
1378
1379
showard418785b2009-11-23 20:19:59 +00001380 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001381 """
showard8cc058f2009-09-08 16:26:33 +00001382 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001383 """
showard8cc058f2009-09-08 16:26:33 +00001384 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001385
showard77182562009-06-10 00:16:05 +00001386 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001387 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001388
showard8cc058f2009-09-08 16:26:33 +00001389 self.queue_entry_ids = task.queue_entry_ids
1390 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001391
showard8cc058f2009-09-08 16:26:33 +00001392 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001393 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001394
1395
jadmanski0afbb632008-06-06 21:10:57 +00001396 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001397 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001398 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001399 self.task.poll()
1400 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001401 self.finished = True
showardec113162008-05-08 00:52:49 +00001402
1403
jadmanski0afbb632008-06-06 21:10:57 +00001404 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001405 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001406
1407
showardd3dc1992009-04-22 21:01:40 +00001408 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001409 if self.task:
1410 self.task.abort()
1411 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001412 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001413 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001414
showardd3dc1992009-04-22 21:01:40 +00001415
showard77182562009-06-10 00:16:05 +00001416class DelayedCallTask(object):
1417 """
1418 A task object like AgentTask for an Agent to run that waits for the
1419 specified amount of time to have elapsed before calling the supplied
1420 callback once and finishing. If the callback returns anything, it is
1421 assumed to be a new Agent instance and will be added to the dispatcher.
1422
1423 @attribute end_time: The absolute posix time after which this task will
1424 call its callback when it is polled and be finished.
1425
1426 Also has all attributes required by the Agent class.
1427 """
1428 def __init__(self, delay_seconds, callback, now_func=None):
1429 """
1430 @param delay_seconds: The delay in seconds from now that this task
1431 will call the supplied callback and be done.
1432 @param callback: A callable to be called by this task once after at
1433 least delay_seconds time has elapsed. It must return None
1434 or a new Agent instance.
1435 @param now_func: A time.time like function. Default: time.time.
1436 Used for testing.
1437 """
1438 assert delay_seconds > 0
1439 assert callable(callback)
1440 if not now_func:
1441 now_func = time.time
1442 self._now_func = now_func
1443 self._callback = callback
1444
1445 self.end_time = self._now_func() + delay_seconds
1446
1447 # These attributes are required by Agent.
1448 self.aborted = False
showard77182562009-06-10 00:16:05 +00001449 self.host_ids = ()
1450 self.success = False
1451 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001452 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001453
1454
1455 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001456 if not self.is_done() and self._now_func() >= self.end_time:
1457 self._callback()
showard77182562009-06-10 00:16:05 +00001458 self.success = True
1459
1460
1461 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001462 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001463
1464
1465 def abort(self):
1466 self.aborted = True
showard77182562009-06-10 00:16:05 +00001467
1468
mbligh36768f02008-02-22 18:28:33 +00001469class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001470 class _NullMonitor(object):
1471 pidfile_id = None
1472
1473 def has_process(self):
1474 return True
1475
1476
1477 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001478 """
showardd1195652009-12-08 22:21:02 +00001479 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001480 """
jadmanski0afbb632008-06-06 21:10:57 +00001481 self.done = False
showardd1195652009-12-08 22:21:02 +00001482 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001483 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001484 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001485 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001486 self.queue_entry_ids = []
1487 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001488 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001489
1490
1491 def _set_ids(self, host=None, queue_entries=None):
1492 if queue_entries and queue_entries != [None]:
1493 self.host_ids = [entry.host.id for entry in queue_entries]
1494 self.queue_entry_ids = [entry.id for entry in queue_entries]
1495 else:
1496 assert host
1497 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001498
1499
jadmanski0afbb632008-06-06 21:10:57 +00001500 def poll(self):
showard08a36412009-05-05 01:01:13 +00001501 if not self.started:
1502 self.start()
showardd1195652009-12-08 22:21:02 +00001503 if not self.done:
1504 self.tick()
showard08a36412009-05-05 01:01:13 +00001505
1506
1507 def tick(self):
showardd1195652009-12-08 22:21:02 +00001508 assert self.monitor
1509 exit_code = self.monitor.exit_code()
1510 if exit_code is None:
1511 return
mbligh36768f02008-02-22 18:28:33 +00001512
showardd1195652009-12-08 22:21:02 +00001513 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001514 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001515
1516
jadmanski0afbb632008-06-06 21:10:57 +00001517 def is_done(self):
1518 return self.done
mbligh36768f02008-02-22 18:28:33 +00001519
1520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001522 if self.done:
showardd1195652009-12-08 22:21:02 +00001523 assert self.started
showard08a36412009-05-05 01:01:13 +00001524 return
showardd1195652009-12-08 22:21:02 +00001525 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.done = True
1527 self.success = success
1528 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001529
1530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001532 """
1533 To be overridden.
1534 """
showarded2afea2009-07-07 20:54:07 +00001535 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001536 self.register_necessary_pidfiles()
1537
1538
1539 def _log_file(self):
1540 if not self._log_file_name:
1541 return None
1542 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001543
mbligh36768f02008-02-22 18:28:33 +00001544
jadmanski0afbb632008-06-06 21:10:57 +00001545 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001546 log_file = self._log_file()
1547 if self.monitor and log_file:
1548 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001549
1550
jadmanski0afbb632008-06-06 21:10:57 +00001551 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001552 """
1553 To be overridden.
1554 """
jadmanski0afbb632008-06-06 21:10:57 +00001555 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001556 logging.info("%s finished with success=%s", type(self).__name__,
1557 self.success)
1558
mbligh36768f02008-02-22 18:28:33 +00001559
1560
jadmanski0afbb632008-06-06 21:10:57 +00001561 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001562 if not self.started:
1563 self.prolog()
1564 self.run()
1565
1566 self.started = True
1567
1568
1569 def abort(self):
1570 if self.monitor:
1571 self.monitor.kill()
1572 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001573 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001574 self.cleanup()
1575
1576
showarded2afea2009-07-07 20:54:07 +00001577 def _get_consistent_execution_path(self, execution_entries):
1578 first_execution_path = execution_entries[0].execution_path()
1579 for execution_entry in execution_entries[1:]:
1580 assert execution_entry.execution_path() == first_execution_path, (
1581 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1582 execution_entry,
1583 first_execution_path,
1584 execution_entries[0]))
1585 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001586
1587
showarded2afea2009-07-07 20:54:07 +00001588 def _copy_results(self, execution_entries, use_monitor=None):
1589 """
1590 @param execution_entries: list of objects with execution_path() method
1591 """
showard6d1c1432009-08-20 23:30:39 +00001592 if use_monitor is not None and not use_monitor.has_process():
1593 return
1594
showarded2afea2009-07-07 20:54:07 +00001595 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001596 if use_monitor is None:
1597 assert self.monitor
1598 use_monitor = self.monitor
1599 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001600 execution_path = self._get_consistent_execution_path(execution_entries)
1601 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001602 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001603
showarda1e74b32009-05-12 17:32:04 +00001604
1605 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001606 for queue_entry in queue_entries:
1607 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001608
1609
showarda1e74b32009-05-12 17:32:04 +00001610 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1611 self._copy_results(queue_entries, use_monitor)
1612 self._parse_results(queue_entries)
1613
1614
showardd1195652009-12-08 22:21:02 +00001615 def _command_line(self):
1616 """
1617 Return the command line to run. Must be overridden.
1618 """
1619 raise NotImplementedError
1620
1621
1622 @property
1623 def num_processes(self):
1624 """
1625 Return the number of processes forked by this AgentTask's process. It
1626 may only be approximate. To be overridden if necessary.
1627 """
1628 return 1
1629
1630
1631 def _paired_with_monitor(self):
1632 """
1633 If this AgentTask's process must run on the same machine as some
1634 previous process, this method should be overridden to return a
1635 PidfileRunMonitor for that process.
1636 """
1637 return self._NullMonitor()
1638
1639
1640 @property
1641 def owner_username(self):
1642 """
1643 Return login of user responsible for this task. May be None. Must be
1644 overridden.
1645 """
1646 raise NotImplementedError
1647
1648
1649 def _working_directory(self):
1650 """
1651 Return the directory where this AgentTask's process executes. Must be
1652 overridden.
1653 """
1654 raise NotImplementedError
1655
1656
1657 def _pidfile_name(self):
1658 """
1659 Return the name of the pidfile this AgentTask's process uses. To be
1660 overridden if necessary.
1661 """
1662 return _AUTOSERV_PID_FILE
1663
1664
1665 def _check_paired_results_exist(self):
1666 if not self._paired_with_monitor().has_process():
1667 email_manager.manager.enqueue_notify_email(
1668 'No paired results in task',
1669 'No paired results in task %s at %s'
1670 % (self, self._paired_with_monitor().pidfile_id))
1671 self.finished(False)
1672 return False
1673 return True
1674
1675
1676 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001677 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001678 self.monitor = PidfileRunMonitor()
1679
1680
1681 def run(self):
1682 if not self._check_paired_results_exist():
1683 return
1684
1685 self._create_monitor()
1686 self.monitor.run(
1687 self._command_line(), self._working_directory(),
1688 num_processes=self.num_processes,
1689 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1690 pidfile_name=self._pidfile_name(),
1691 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1692 username=self.owner_username)
1693
1694
1695 def register_necessary_pidfiles(self):
1696 pidfile_id = _drone_manager.get_pidfile_id_from(
1697 self._working_directory(), self._pidfile_name())
1698 _drone_manager.register_pidfile(pidfile_id)
1699
1700 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1701 if paired_pidfile_id:
1702 _drone_manager.register_pidfile(paired_pidfile_id)
1703
1704
1705 def recover(self):
1706 if not self._check_paired_results_exist():
1707 return
1708
1709 self._create_monitor()
1710 self.monitor.attach_to_existing_process(
1711 self._working_directory(), pidfile_name=self._pidfile_name(),
1712 num_processes=self.num_processes)
1713 if not self.monitor.has_process():
1714 # no process to recover; wait to be started normally
1715 self.monitor = None
1716 return
1717
1718 self.started = True
1719 logging.info('Recovering process %s for %s at %s'
1720 % (self.monitor.get_process(), type(self).__name__,
1721 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001722
1723
showardd9205182009-04-27 20:09:55 +00001724class TaskWithJobKeyvals(object):
1725 """AgentTask mixin providing functionality to help with job keyval files."""
1726 _KEYVAL_FILE = 'keyval'
1727 def _format_keyval(self, key, value):
1728 return '%s=%s' % (key, value)
1729
1730
1731 def _keyval_path(self):
1732 """Subclasses must override this"""
1733 raise NotImplemented
1734
1735
1736 def _write_keyval_after_job(self, field, value):
1737 assert self.monitor
1738 if not self.monitor.has_process():
1739 return
1740 _drone_manager.write_lines_to_file(
1741 self._keyval_path(), [self._format_keyval(field, value)],
1742 paired_with_process=self.monitor.get_process())
1743
1744
1745 def _job_queued_keyval(self, job):
1746 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1747
1748
1749 def _write_job_finished(self):
1750 self._write_keyval_after_job("job_finished", int(time.time()))
1751
1752
showarddb502762009-09-09 15:31:20 +00001753 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1754 keyval_contents = '\n'.join(self._format_keyval(key, value)
1755 for key, value in keyval_dict.iteritems())
1756 # always end with a newline to allow additional keyvals to be written
1757 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001758 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001759 keyval_contents,
1760 file_path=keyval_path)
1761
1762
1763 def _write_keyvals_before_job(self, keyval_dict):
1764 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1765
1766
1767 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001768 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001769 host.hostname)
1770 platform, all_labels = host.platform_and_labels()
1771 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1772 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1773
1774
showard8cc058f2009-09-08 16:26:33 +00001775class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001776 """
1777 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1778 """
1779
1780 TASK_TYPE = None
1781 host = None
1782 queue_entry = None
1783
showardd1195652009-12-08 22:21:02 +00001784 def __init__(self, task, extra_command_args):
1785 super(SpecialAgentTask, self).__init__()
1786
showarded2afea2009-07-07 20:54:07 +00001787 assert (self.TASK_TYPE is not None,
1788 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001789
1790 self.host = Host(id=task.host.id)
1791 self.queue_entry = None
1792 if task.queue_entry:
1793 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1794
showarded2afea2009-07-07 20:54:07 +00001795 self.task = task
1796 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001797
1798
showard8cc058f2009-09-08 16:26:33 +00001799 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001800 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1801
1802
1803 def _command_line(self):
1804 return _autoserv_command_line(self.host.hostname,
1805 self._extra_command_args,
1806 queue_entry=self.queue_entry)
1807
1808
1809 def _working_directory(self):
1810 return self.task.execution_path()
1811
1812
1813 @property
1814 def owner_username(self):
1815 if self.task.requested_by:
1816 return self.task.requested_by.login
1817 return None
showard8cc058f2009-09-08 16:26:33 +00001818
1819
showarded2afea2009-07-07 20:54:07 +00001820 def prolog(self):
1821 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001822 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001823 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001824
1825
showardde634ee2009-01-30 01:44:24 +00001826 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001827 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001828
showard2fe3f1d2009-07-06 20:19:11 +00001829 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001830 return # don't fail metahost entries, they'll be reassigned
1831
showard2fe3f1d2009-07-06 20:19:11 +00001832 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001833 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001834 return # entry has been aborted
1835
showard2fe3f1d2009-07-06 20:19:11 +00001836 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001837 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001838 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001839 self._write_keyval_after_job(queued_key, queued_time)
1840 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001841
showard8cc058f2009-09-08 16:26:33 +00001842 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001843 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001844 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001845 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001846
showard2fe3f1d2009-07-06 20:19:11 +00001847 self._copy_results([self.queue_entry])
showardd1195652009-12-08 22:21:02 +00001848
1849 if not self.queue_entry.job.parse_failed_repair:
1850 self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
1851 return
showard8cc058f2009-09-08 16:26:33 +00001852
1853 pidfile_id = _drone_manager.get_pidfile_id_from(
1854 self.queue_entry.execution_path(),
1855 pidfile_name=_AUTOSERV_PID_FILE)
1856 _drone_manager.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +00001857 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001858
1859
1860 def cleanup(self):
1861 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001862
1863 # We will consider an aborted task to be "Failed"
1864 self.task.finish(bool(self.success))
1865
showardf85a0b72009-10-07 20:48:45 +00001866 if self.monitor:
1867 if self.monitor.has_process():
1868 self._copy_results([self.task])
1869 if self.monitor.pidfile_id is not None:
1870 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001871
1872
1873class RepairTask(SpecialAgentTask):
1874 TASK_TYPE = models.SpecialTask.Task.REPAIR
1875
1876
showardd1195652009-12-08 22:21:02 +00001877 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001878 """\
1879 queue_entry: queue entry to mark failed if this repair fails.
1880 """
1881 protection = host_protections.Protection.get_string(
1882 task.host.protection)
1883 # normalize the protection name
1884 protection = host_protections.Protection.get_attr_name(protection)
1885
1886 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001887 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001888
1889 # *don't* include the queue entry in IDs -- if the queue entry is
1890 # aborted, we want to leave the repair task running
1891 self._set_ids(host=self.host)
1892
1893
1894 def prolog(self):
1895 super(RepairTask, self).prolog()
1896 logging.info("repair_task starting")
1897 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001898
1899
jadmanski0afbb632008-06-06 21:10:57 +00001900 def epilog(self):
1901 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001902
jadmanski0afbb632008-06-06 21:10:57 +00001903 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001904 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001905 else:
showard8cc058f2009-09-08 16:26:33 +00001906 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001907 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001908 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001909
1910
showarded2afea2009-07-07 20:54:07 +00001911class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001912 def _copy_to_results_repository(self):
1913 if not self.queue_entry or self.queue_entry.meta_host:
1914 return
1915
1916 self.queue_entry.set_execution_subdir()
1917 log_name = os.path.basename(self.task.execution_path())
1918 source = os.path.join(self.task.execution_path(), 'debug',
1919 'autoserv.DEBUG')
1920 destination = os.path.join(
1921 self.queue_entry.execution_path(), log_name)
1922
1923 self.monitor.try_copy_to_results_repository(
1924 source, destination_path=destination)
1925
1926
showard170873e2009-01-07 00:22:26 +00001927 def epilog(self):
1928 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001929
showard775300b2009-09-09 15:30:50 +00001930 if self.success:
1931 return
showard8fe93b52008-11-18 17:53:22 +00001932
showard775300b2009-09-09 15:30:50 +00001933 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001934
showard775300b2009-09-09 15:30:50 +00001935 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001936 # effectively ignore failure for these hosts
1937 self.success = True
showard775300b2009-09-09 15:30:50 +00001938 return
1939
1940 if self.queue_entry:
1941 self.queue_entry.requeue()
1942
1943 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001944 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001945 queue_entry__id=self.queue_entry.id):
1946 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1947 self._fail_queue_entry()
1948 return
1949
showard9bb960b2009-11-19 01:02:11 +00001950 queue_entry = models.HostQueueEntry.objects.get(
1951 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001952 else:
1953 queue_entry = None
1954
1955 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001956 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001957 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001958 queue_entry=queue_entry,
1959 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001960
showard8fe93b52008-11-18 17:53:22 +00001961
1962class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001963 TASK_TYPE = models.SpecialTask.Task.VERIFY
1964
1965
showardd1195652009-12-08 22:21:02 +00001966 def __init__(self, task):
1967 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001968 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001969
1970
jadmanski0afbb632008-06-06 21:10:57 +00001971 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001972 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001973
showardb18134f2009-03-20 20:52:18 +00001974 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001975 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001976 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1977 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001978
showarded2afea2009-07-07 20:54:07 +00001979 # Delete any other queued verifies for this host. One verify will do
1980 # and there's no need to keep records of other requests.
1981 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001982 host__id=self.host.id,
1983 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001984 is_active=False, is_complete=False)
1985 queued_verifies = queued_verifies.exclude(id=self.task.id)
1986 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001987
mbligh36768f02008-02-22 18:28:33 +00001988
jadmanski0afbb632008-06-06 21:10:57 +00001989 def epilog(self):
1990 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001991 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001992 if self.queue_entry:
1993 self.queue_entry.on_pending()
1994 else:
1995 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001996
1997
showarda9545c02009-12-18 22:44:26 +00001998class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1999 """
2000 Common functionality for QueueTask and HostlessQueueTask
2001 """
2002 def __init__(self, queue_entries):
2003 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002004 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002005 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002006
2007
showard73ec0442009-02-07 02:05:20 +00002008 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002009 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002010
2011
showardd1195652009-12-08 22:21:02 +00002012 def _command_line(self):
2013 return self.job.get_autoserv_params(self.queue_entries)
2014
2015
2016 @property
2017 def num_processes(self):
2018 return len(self.queue_entries)
2019
2020
2021 @property
2022 def owner_username(self):
2023 return self.job.owner
2024
2025
2026 def _working_directory(self):
2027 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002028
2029
jadmanski0afbb632008-06-06 21:10:57 +00002030 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002031 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00002032 keyval_dict = {queued_key: queued_time}
showardd1195652009-12-08 22:21:02 +00002033 group_name = self.queue_entries[0].get_group_name()
2034 if group_name:
2035 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002036 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002037 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002038 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002039 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002040
2041
showard35162b02009-03-03 02:17:30 +00002042 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002043 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002044 _drone_manager.write_lines_to_file(error_file_path,
2045 [_LOST_PROCESS_ERROR])
2046
2047
showardd3dc1992009-04-22 21:01:40 +00002048 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002049 if not self.monitor:
2050 return
2051
showardd9205182009-04-27 20:09:55 +00002052 self._write_job_finished()
2053
showard35162b02009-03-03 02:17:30 +00002054 if self.monitor.lost_process:
2055 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002056
jadmanskif7fa2cc2008-10-01 14:13:23 +00002057
showardcbd74612008-11-19 21:42:02 +00002058 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002059 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002060 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002061 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002062 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002063
2064
jadmanskif7fa2cc2008-10-01 14:13:23 +00002065 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002066 if not self.monitor or not self.monitor.has_process():
2067 return
2068
jadmanskif7fa2cc2008-10-01 14:13:23 +00002069 # build up sets of all the aborted_by and aborted_on values
2070 aborted_by, aborted_on = set(), set()
2071 for queue_entry in self.queue_entries:
2072 if queue_entry.aborted_by:
2073 aborted_by.add(queue_entry.aborted_by)
2074 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2075 aborted_on.add(t)
2076
2077 # extract some actual, unique aborted by value and write it out
2078 assert len(aborted_by) <= 1
2079 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002080 aborted_by_value = aborted_by.pop()
2081 aborted_on_value = max(aborted_on)
2082 else:
2083 aborted_by_value = 'autotest_system'
2084 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002085
showarda0382352009-02-11 23:36:43 +00002086 self._write_keyval_after_job("aborted_by", aborted_by_value)
2087 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002088
showardcbd74612008-11-19 21:42:02 +00002089 aborted_on_string = str(datetime.datetime.fromtimestamp(
2090 aborted_on_value))
2091 self._write_status_comment('Job aborted by %s on %s' %
2092 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002093
2094
jadmanski0afbb632008-06-06 21:10:57 +00002095 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002096 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002097 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002098 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002099
2100
jadmanski0afbb632008-06-06 21:10:57 +00002101 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002102 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002103 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002104
2105
2106class QueueTask(AbstractQueueTask):
2107 def __init__(self, queue_entries):
2108 super(QueueTask, self).__init__(queue_entries)
2109 self._set_ids(queue_entries=queue_entries)
2110
2111
2112 def prolog(self):
2113 for entry in self.queue_entries:
2114 if entry.status not in (models.HostQueueEntry.Status.STARTING,
2115 models.HostQueueEntry.Status.RUNNING):
2116 raise SchedulerError('Queue task attempting to start '
2117 'entry with invalid status %s: %s'
2118 % (entry.status, entry))
2119 if entry.host.status not in (models.Host.Status.PENDING,
2120 models.Host.Status.RUNNING):
2121 raise SchedulerError('Queue task attempting to start on queue '
2122 'entry with invalid host status %s: %s'
2123 % (entry.host.status, entry))
2124
2125 super(QueueTask, self).prolog()
2126
2127 for queue_entry in self.queue_entries:
2128 self._write_host_keyvals(queue_entry.host)
2129 queue_entry.host.set_status(models.Host.Status.RUNNING)
2130 queue_entry.host.update_field('dirty', 1)
2131 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2132 # TODO(gps): Remove this if nothing needs it anymore.
2133 # A potential user is: tko/parser
2134 self.job.write_to_machines_file(self.queue_entries[0])
2135
2136
2137 def _finish_task(self):
2138 super(QueueTask, self)._finish_task()
2139
2140 for queue_entry in self.queue_entries:
2141 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002142
2143
showardd3dc1992009-04-22 21:01:40 +00002144class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002145 def __init__(self, queue_entries, log_file_name):
2146 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002147
showardd1195652009-12-08 22:21:02 +00002148 self.queue_entries = queue_entries
2149
showardd3dc1992009-04-22 21:01:40 +00002150 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002151 self._autoserv_monitor.attach_to_existing_process(
2152 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002153
showardd1195652009-12-08 22:21:02 +00002154
2155 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002156 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002157 return 'true'
2158 return self._generate_command(
2159 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002160
2161
2162 def _generate_command(self, results_dir):
2163 raise NotImplementedError('Subclasses must override this')
2164
2165
showardd1195652009-12-08 22:21:02 +00002166 @property
2167 def owner_username(self):
2168 return self.queue_entries[0].job.owner
2169
2170
2171 def _working_directory(self):
2172 return self._get_consistent_execution_path(self.queue_entries)
2173
2174
2175 def _paired_with_monitor(self):
2176 return self._autoserv_monitor
2177
2178
showardd3dc1992009-04-22 21:01:40 +00002179 def _job_was_aborted(self):
2180 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002181 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002182 queue_entry.update_from_database()
2183 if was_aborted is None: # first queue entry
2184 was_aborted = bool(queue_entry.aborted)
2185 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2186 email_manager.manager.enqueue_notify_email(
2187 'Inconsistent abort state',
2188 'Queue entries have inconsistent abort state: ' +
2189 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2190 # don't crash here, just assume true
2191 return True
2192 return was_aborted
2193
2194
showardd1195652009-12-08 22:21:02 +00002195 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002196 if self._job_was_aborted():
2197 return models.HostQueueEntry.Status.ABORTED
2198
2199 # we'll use a PidfileRunMonitor to read the autoserv exit status
2200 if self._autoserv_monitor.exit_code() == 0:
2201 return models.HostQueueEntry.Status.COMPLETED
2202 return models.HostQueueEntry.Status.FAILED
2203
2204
showardd3dc1992009-04-22 21:01:40 +00002205 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002206 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002207 queue_entry.set_status(status)
2208
2209
2210 def abort(self):
2211 # override AgentTask.abort() to avoid killing the process and ending
2212 # the task. post-job tasks continue when the job is aborted.
2213 pass
2214
2215
showard9bb960b2009-11-19 01:02:11 +00002216class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002217 """
2218 Task responsible for
2219 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2220 * copying logs to the results repository
2221 * spawning CleanupTasks for hosts, if necessary
2222 * spawning a FinalReparseTask for the job
2223 """
showardd1195652009-12-08 22:21:02 +00002224 def __init__(self, queue_entries, recover_run_monitor=None):
2225 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002226 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002227 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002228 self._set_ids(queue_entries=queue_entries)
2229
2230
2231 def _generate_command(self, results_dir):
2232 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002233 for queue_entry in self.queue_entries)
showardd3dc1992009-04-22 21:01:40 +00002234 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2235 '-r', results_dir]
2236
2237
showardd1195652009-12-08 22:21:02 +00002238 @property
2239 def num_processes(self):
2240 return len(self.queue_entries)
2241
2242
2243 def _pidfile_name(self):
2244 return _CRASHINFO_PID_FILE
2245
2246
showardd3dc1992009-04-22 21:01:40 +00002247 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002248 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002249 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2250 raise SchedulerError('Gather task attempting to start on '
2251 'non-gathering entry: %s' % queue_entry)
2252 if queue_entry.host.status != models.Host.Status.RUNNING:
2253 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002254 'entry with non-running host status %s: %s'
2255 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002256
showardd3dc1992009-04-22 21:01:40 +00002257 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002258
2259
showardd3dc1992009-04-22 21:01:40 +00002260 def epilog(self):
2261 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002262
showardd1195652009-12-08 22:21:02 +00002263 self._copy_and_parse_results(self.queue_entries,
showard6d1c1432009-08-20 23:30:39 +00002264 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002265 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002266
showard9bb960b2009-11-19 01:02:11 +00002267
2268 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002269 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002270 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002271 models.HostQueueEntry.Status.COMPLETED)
2272 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2273 else:
2274 final_success = False
2275 num_tests_failed = 0
2276
showard9bb960b2009-11-19 01:02:11 +00002277 reboot_after = self._job.reboot_after
2278 do_reboot = (
2279 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002280 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002281 or reboot_after == models.RebootAfter.ALWAYS
2282 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2283 and final_success and num_tests_failed == 0))
2284
showardd1195652009-12-08 22:21:02 +00002285 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002286 if do_reboot:
2287 # don't pass the queue entry to the CleanupTask. if the cleanup
2288 # fails, the job doesn't care -- it's over.
2289 models.SpecialTask.objects.create(
2290 host=models.Host.objects.get(id=queue_entry.host.id),
2291 task=models.SpecialTask.Task.CLEANUP,
2292 requested_by=self._job.owner_model())
2293 else:
2294 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002295
2296
showard0bbfc212009-04-29 21:06:13 +00002297 def run(self):
showard597bfd32009-05-08 18:22:50 +00002298 autoserv_exit_code = self._autoserv_monitor.exit_code()
2299 # only run if Autoserv exited due to some signal. if we have no exit
2300 # code, assume something bad (and signal-like) happened.
2301 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002302 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002303 else:
2304 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002305
2306
showard8fe93b52008-11-18 17:53:22 +00002307class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002308 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2309
2310
showard8cc058f2009-09-08 16:26:33 +00002311 def __init__(self, task, recover_run_monitor=None):
showardd1195652009-12-08 22:21:02 +00002312 super(CleanupTask, self).__init__(task, ['--cleanup'])
showard8cc058f2009-09-08 16:26:33 +00002313 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002314
mblighd5c95802008-03-05 00:33:46 +00002315
jadmanski0afbb632008-06-06 21:10:57 +00002316 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002317 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002318 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002319 self.host.set_status(models.Host.Status.CLEANING)
2320 if self.queue_entry:
2321 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2322
2323
showard775300b2009-09-09 15:30:50 +00002324 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002325 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002326 return
2327
showard7b2d7cb2009-10-28 19:53:03 +00002328 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2329 should_run_verify = (
2330 self.queue_entry.job.run_verify
2331 and self.host.protection != do_not_verify_protection)
2332 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002333 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002334 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002335 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002336 queue_entry=entry,
2337 task=models.SpecialTask.Task.VERIFY)
2338 else:
showard775300b2009-09-09 15:30:50 +00002339 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002340
mblighd5c95802008-03-05 00:33:46 +00002341
showard21baa452008-10-21 00:08:39 +00002342 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002343 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002344
showard21baa452008-10-21 00:08:39 +00002345 if self.success:
2346 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002347 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002348
showard775300b2009-09-09 15:30:50 +00002349 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002350
showard21baa452008-10-21 00:08:39 +00002351
showardd3dc1992009-04-22 21:01:40 +00002352class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002353 _num_running_parses = 0
2354
showardd1195652009-12-08 22:21:02 +00002355 def __init__(self, queue_entries):
2356 super(FinalReparseTask, self).__init__(queue_entries,
2357 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002358 # don't use _set_ids, since we don't want to set the host_ids
2359 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002360
2361
2362 def _generate_command(self, results_dir):
2363 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
2364 results_dir]
2365
2366
2367 @property
2368 def num_processes(self):
2369 return 0 # don't include parser processes in accounting
2370
2371
2372 def _pidfile_name(self):
2373 return _PARSER_PID_FILE
2374
2375
2376 def _parse_started(self):
2377 return bool(self.monitor)
showard97aed502008-11-04 02:01:24 +00002378
showard97aed502008-11-04 02:01:24 +00002379
2380 @classmethod
2381 def _increment_running_parses(cls):
2382 cls._num_running_parses += 1
2383
2384
2385 @classmethod
2386 def _decrement_running_parses(cls):
2387 cls._num_running_parses -= 1
2388
2389
2390 @classmethod
2391 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002392 return (cls._num_running_parses <
2393 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002394
2395
2396 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002397 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002398 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2399 raise SchedulerError('Parse task attempting to start on '
2400 'non-parsing entry: %s' % queue_entry)
2401
showard97aed502008-11-04 02:01:24 +00002402 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002403
2404
2405 def epilog(self):
2406 super(FinalReparseTask, self).epilog()
showardd1195652009-12-08 22:21:02 +00002407 self._set_all_statuses(self._final_status())
showard97aed502008-11-04 02:01:24 +00002408
2409
showard08a36412009-05-05 01:01:13 +00002410 def tick(self):
2411 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002412 # and we can, at which point we revert to default behavior
showardd1195652009-12-08 22:21:02 +00002413 if self._parse_started():
showard08a36412009-05-05 01:01:13 +00002414 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002415 else:
2416 self._try_starting_parse()
2417
2418
2419 def run(self):
2420 # override run() to not actually run unless we can
2421 self._try_starting_parse()
2422
2423
2424 def _try_starting_parse(self):
2425 if not self._can_run_new_parse():
2426 return
showard170873e2009-01-07 00:22:26 +00002427
showard97aed502008-11-04 02:01:24 +00002428 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002429 super(FinalReparseTask, self).run()
showard97aed502008-11-04 02:01:24 +00002430 self._increment_running_parses()
showard97aed502008-11-04 02:01:24 +00002431
2432
2433 def finished(self, success):
2434 super(FinalReparseTask, self).finished(success)
showardd1195652009-12-08 22:21:02 +00002435 if self._parse_started():
showard678df4f2009-02-04 21:36:39 +00002436 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002437
2438
showarda9545c02009-12-18 22:44:26 +00002439class HostlessQueueTask(AbstractQueueTask):
2440 def __init__(self, queue_entry):
2441 super(HostlessQueueTask, self).__init__([queue_entry])
2442 self.queue_entry_ids = [queue_entry.id]
2443
2444
2445 def prolog(self):
2446 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2447 super(HostlessQueueTask, self).prolog()
2448
2449
2450 def _final_status(self):
2451 if self.queue_entries[0].aborted:
2452 return models.HostQueueEntry.Status.ABORTED
2453 if self.monitor.exit_code() == 0:
2454 return models.HostQueueEntry.Status.COMPLETED
2455 return models.HostQueueEntry.Status.FAILED
2456
2457
2458 def _finish_task(self):
2459 super(HostlessQueueTask, self)._finish_task()
2460 self.queue_entries[0].set_status(self._final_status())
2461
2462
showarda3c58572009-03-12 20:36:59 +00002463class DBError(Exception):
2464 """Raised by the DBObject constructor when its select fails."""
2465
2466
mbligh36768f02008-02-22 18:28:33 +00002467class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002468 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002469
2470 # Subclasses MUST override these:
2471 _table_name = ''
2472 _fields = ()
2473
showarda3c58572009-03-12 20:36:59 +00002474 # A mapping from (type, id) to the instance of the object for that
2475 # particular id. This prevents us from creating new Job() and Host()
2476 # instances for every HostQueueEntry object that we instantiate as
2477 # multiple HQEs often share the same Job.
2478 _instances_by_type_and_id = weakref.WeakValueDictionary()
2479 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002480
showarda3c58572009-03-12 20:36:59 +00002481
2482 def __new__(cls, id=None, **kwargs):
2483 """
2484 Look to see if we already have an instance for this particular type
2485 and id. If so, use it instead of creating a duplicate instance.
2486 """
2487 if id is not None:
2488 instance = cls._instances_by_type_and_id.get((cls, id))
2489 if instance:
2490 return instance
2491 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2492
2493
2494 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002495 assert bool(id) or bool(row)
2496 if id is not None and row is not None:
2497 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002498 assert self._table_name, '_table_name must be defined in your class'
2499 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002500 if not new_record:
2501 if self._initialized and not always_query:
2502 return # We've already been initialized.
2503 if id is None:
2504 id = row[0]
2505 # Tell future constructors to use us instead of re-querying while
2506 # this instance is still around.
2507 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002508
showard6ae5ea92009-02-25 00:11:51 +00002509 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002510
jadmanski0afbb632008-06-06 21:10:57 +00002511 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002512
jadmanski0afbb632008-06-06 21:10:57 +00002513 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002514 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002515
showarda3c58572009-03-12 20:36:59 +00002516 if self._initialized:
2517 differences = self._compare_fields_in_row(row)
2518 if differences:
showard7629f142009-03-27 21:02:02 +00002519 logging.warn(
2520 'initialized %s %s instance requery is updating: %s',
2521 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002522 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002523 self._initialized = True
2524
2525
2526 @classmethod
2527 def _clear_instance_cache(cls):
2528 """Used for testing, clear the internal instance cache."""
2529 cls._instances_by_type_and_id.clear()
2530
2531
showardccbd6c52009-03-21 00:10:21 +00002532 def _fetch_row_from_db(self, row_id):
2533 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2534 rows = _db.execute(sql, (row_id,))
2535 if not rows:
showard76e29d12009-04-15 21:53:10 +00002536 raise DBError("row not found (table=%s, row id=%s)"
2537 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002538 return rows[0]
2539
2540
showarda3c58572009-03-12 20:36:59 +00002541 def _assert_row_length(self, row):
2542 assert len(row) == len(self._fields), (
2543 "table = %s, row = %s/%d, fields = %s/%d" % (
2544 self.__table, row, len(row), self._fields, len(self._fields)))
2545
2546
2547 def _compare_fields_in_row(self, row):
2548 """
showarddae680a2009-10-12 20:26:43 +00002549 Given a row as returned by a SELECT query, compare it to our existing in
2550 memory fields. Fractional seconds are stripped from datetime values
2551 before comparison.
showarda3c58572009-03-12 20:36:59 +00002552
2553 @param row - A sequence of values corresponding to fields named in
2554 The class attribute _fields.
2555
2556 @returns A dictionary listing the differences keyed by field name
2557 containing tuples of (current_value, row_value).
2558 """
2559 self._assert_row_length(row)
2560 differences = {}
showarddae680a2009-10-12 20:26:43 +00002561 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002562 for field, row_value in itertools.izip(self._fields, row):
2563 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002564 if (isinstance(current_value, datetime.datetime)
2565 and isinstance(row_value, datetime.datetime)):
2566 current_value = current_value.strftime(datetime_cmp_fmt)
2567 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002568 if current_value != row_value:
2569 differences[field] = (current_value, row_value)
2570 return differences
showard2bab8f42008-11-12 18:15:22 +00002571
2572
2573 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002574 """
2575 Update our field attributes using a single row returned by SELECT.
2576
2577 @param row - A sequence of values corresponding to fields named in
2578 the class fields list.
2579 """
2580 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002581
showard2bab8f42008-11-12 18:15:22 +00002582 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002583 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002584 setattr(self, field, value)
2585 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002586
showard2bab8f42008-11-12 18:15:22 +00002587 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002588
mblighe2586682008-02-29 22:45:46 +00002589
showardccbd6c52009-03-21 00:10:21 +00002590 def update_from_database(self):
2591 assert self.id is not None
2592 row = self._fetch_row_from_db(self.id)
2593 self._update_fields_from_row(row)
2594
2595
jadmanski0afbb632008-06-06 21:10:57 +00002596 def count(self, where, table = None):
2597 if not table:
2598 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002599
jadmanski0afbb632008-06-06 21:10:57 +00002600 rows = _db.execute("""
2601 SELECT count(*) FROM %s
2602 WHERE %s
2603 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002604
jadmanski0afbb632008-06-06 21:10:57 +00002605 assert len(rows) == 1
2606
2607 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002608
2609
showardd3dc1992009-04-22 21:01:40 +00002610 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002611 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002612
showard2bab8f42008-11-12 18:15:22 +00002613 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002614 return
mbligh36768f02008-02-22 18:28:33 +00002615
mblighf8c624d2008-07-03 16:58:45 +00002616 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002617 _db.execute(query, (value, self.id))
2618
showard2bab8f42008-11-12 18:15:22 +00002619 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002620
2621
jadmanski0afbb632008-06-06 21:10:57 +00002622 def save(self):
2623 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002624 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002625 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002626 values = []
2627 for key in keys:
2628 value = getattr(self, key)
2629 if value is None:
2630 values.append('NULL')
2631 else:
2632 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002633 values_str = ','.join(values)
2634 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2635 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002636 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002637 # Update our id to the one the database just assigned to us.
2638 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002639
2640
jadmanski0afbb632008-06-06 21:10:57 +00002641 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002642 self._instances_by_type_and_id.pop((type(self), id), None)
2643 self._initialized = False
2644 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002645 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2646 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002647
2648
showard63a34772008-08-18 19:32:50 +00002649 @staticmethod
2650 def _prefix_with(string, prefix):
2651 if string:
2652 string = prefix + string
2653 return string
2654
2655
jadmanski0afbb632008-06-06 21:10:57 +00002656 @classmethod
showard989f25d2008-10-01 11:38:11 +00002657 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002658 """
2659 Construct instances of our class based on the given database query.
2660
2661 @yields One class instance for each row fetched.
2662 """
showard63a34772008-08-18 19:32:50 +00002663 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2664 where = cls._prefix_with(where, 'WHERE ')
2665 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002666 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002667 'joins' : joins,
2668 'where' : where,
2669 'order_by' : order_by})
2670 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002671 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002672
mbligh36768f02008-02-22 18:28:33 +00002673
2674class IneligibleHostQueue(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002675 _table_name = 'afe_ineligible_host_queues'
showard6ae5ea92009-02-25 00:11:51 +00002676 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002677
2678
showard89f84db2009-03-12 20:39:13 +00002679class AtomicGroup(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002680 _table_name = 'afe_atomic_groups'
showard205fd602009-03-21 00:17:35 +00002681 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2682 'invalid')
showard89f84db2009-03-12 20:39:13 +00002683
2684
showard989f25d2008-10-01 11:38:11 +00002685class Label(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002686 _table_name = 'afe_labels'
showard6ae5ea92009-02-25 00:11:51 +00002687 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002688 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002689
2690
showard6157c632009-07-06 20:19:31 +00002691 def __repr__(self):
2692 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2693 self.name, self.id, self.atomic_group_id)
2694
2695
mbligh36768f02008-02-22 18:28:33 +00002696class Host(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002697 _table_name = 'afe_hosts'
showard6ae5ea92009-02-25 00:11:51 +00002698 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2699 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2700
2701
jadmanski0afbb632008-06-06 21:10:57 +00002702 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002703 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002704 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002705
2706
showard170873e2009-01-07 00:22:26 +00002707 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002708 """
showard170873e2009-01-07 00:22:26 +00002709 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002710 """
2711 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00002712 SELECT afe_labels.name, afe_labels.platform
2713 FROM afe_labels
2714 INNER JOIN afe_hosts_labels ON
2715 afe_labels.id = afe_hosts_labels.label_id
2716 WHERE afe_hosts_labels.host_id = %s
2717 ORDER BY afe_labels.name
showardd8e548a2008-09-09 03:04:57 +00002718 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002719 platform = None
2720 all_labels = []
2721 for label_name, is_platform in rows:
2722 if is_platform:
2723 platform = label_name
2724 all_labels.append(label_name)
2725 return platform, all_labels
2726
2727
showard54c1ea92009-05-20 00:32:58 +00002728 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2729
2730
2731 @classmethod
2732 def cmp_for_sort(cls, a, b):
2733 """
2734 A comparison function for sorting Host objects by hostname.
2735
2736 This strips any trailing numeric digits, ignores leading 0s and
2737 compares hostnames by the leading name and the trailing digits as a
2738 number. If both hostnames do not match this pattern, they are simply
2739 compared as lower case strings.
2740
2741 Example of how hostnames will be sorted:
2742
2743 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2744
2745 This hopefully satisfy most people's hostname sorting needs regardless
2746 of their exact naming schemes. Nobody sane should have both a host10
2747 and host010 (but the algorithm works regardless).
2748 """
2749 lower_a = a.hostname.lower()
2750 lower_b = b.hostname.lower()
2751 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2752 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2753 if match_a and match_b:
2754 name_a, number_a_str = match_a.groups()
2755 name_b, number_b_str = match_b.groups()
2756 number_a = int(number_a_str.lstrip('0'))
2757 number_b = int(number_b_str.lstrip('0'))
2758 result = cmp((name_a, number_a), (name_b, number_b))
2759 if result == 0 and lower_a != lower_b:
2760 # If they compared equal above but the lower case names are
2761 # indeed different, don't report equality. abc012 != abc12.
2762 return cmp(lower_a, lower_b)
2763 return result
2764 else:
2765 return cmp(lower_a, lower_b)
2766
2767
mbligh36768f02008-02-22 18:28:33 +00002768class HostQueueEntry(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002769 _table_name = 'afe_host_queue_entries'
showard6ae5ea92009-02-25 00:11:51 +00002770 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002771 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002772 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002773
2774
showarda3c58572009-03-12 20:36:59 +00002775 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002776 assert id or row
showarda3c58572009-03-12 20:36:59 +00002777 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002778 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002779
jadmanski0afbb632008-06-06 21:10:57 +00002780 if self.host_id:
2781 self.host = Host(self.host_id)
2782 else:
2783 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002784
showard77182562009-06-10 00:16:05 +00002785 if self.atomic_group_id:
2786 self.atomic_group = AtomicGroup(self.atomic_group_id,
2787 always_query=False)
2788 else:
2789 self.atomic_group = None
2790
showard170873e2009-01-07 00:22:26 +00002791 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002792 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002793
2794
showard89f84db2009-03-12 20:39:13 +00002795 @classmethod
2796 def clone(cls, template):
2797 """
2798 Creates a new row using the values from a template instance.
2799
2800 The new instance will not exist in the database or have a valid
2801 id attribute until its save() method is called.
2802 """
2803 assert isinstance(template, cls)
2804 new_row = [getattr(template, field) for field in cls._fields]
2805 clone = cls(row=new_row, new_record=True)
2806 clone.id = None
2807 return clone
2808
2809
showardc85c21b2008-11-24 22:17:37 +00002810 def _view_job_url(self):
2811 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2812
2813
showardf1ae3542009-05-11 19:26:02 +00002814 def get_labels(self):
2815 """
2816 Get all labels associated with this host queue entry (either via the
2817 meta_host or as a job dependency label). The labels yielded are not
2818 guaranteed to be unique.
2819
2820 @yields Label instances associated with this host_queue_entry.
2821 """
2822 if self.meta_host:
2823 yield Label(id=self.meta_host, always_query=False)
2824 labels = Label.fetch(
showardeab66ce2009-12-23 00:03:56 +00002825 joins="JOIN afe_jobs_dependency_labels AS deps "
2826 "ON (afe_labels.id = deps.label_id)",
showardf1ae3542009-05-11 19:26:02 +00002827 where="deps.job_id = %d" % self.job.id)
2828 for label in labels:
2829 yield label
2830
2831
jadmanski0afbb632008-06-06 21:10:57 +00002832 def set_host(self, host):
2833 if host:
2834 self.queue_log_record('Assigning host ' + host.hostname)
2835 self.update_field('host_id', host.id)
2836 self.update_field('active', True)
2837 self.block_host(host.id)
2838 else:
2839 self.queue_log_record('Releasing host')
2840 self.unblock_host(self.host.id)
2841 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002842
jadmanski0afbb632008-06-06 21:10:57 +00002843 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002844
2845
jadmanski0afbb632008-06-06 21:10:57 +00002846 def queue_log_record(self, log_line):
2847 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002848 _drone_manager.write_lines_to_file(self.queue_log_path,
2849 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002850
2851
jadmanski0afbb632008-06-06 21:10:57 +00002852 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002853 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002854 row = [0, self.job.id, host_id]
2855 block = IneligibleHostQueue(row=row, new_record=True)
2856 block.save()
mblighe2586682008-02-29 22:45:46 +00002857
2858
jadmanski0afbb632008-06-06 21:10:57 +00002859 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002860 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002861 blocks = IneligibleHostQueue.fetch(
2862 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2863 for block in blocks:
2864 block.delete()
mblighe2586682008-02-29 22:45:46 +00002865
2866
showard2bab8f42008-11-12 18:15:22 +00002867 def set_execution_subdir(self, subdir=None):
2868 if subdir is None:
showarda9545c02009-12-18 22:44:26 +00002869 assert self.host
2870 subdir = self.host.hostname
showard2bab8f42008-11-12 18:15:22 +00002871 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002872
2873
showard6355f6b2008-12-05 18:52:13 +00002874 def _get_hostname(self):
2875 if self.host:
2876 return self.host.hostname
2877 return 'no host'
2878
2879
showard170873e2009-01-07 00:22:26 +00002880 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002881 flags = []
2882 if self.active:
2883 flags.append('active')
2884 if self.complete:
2885 flags.append('complete')
2886 if self.deleted:
2887 flags.append('deleted')
2888 if self.aborted:
2889 flags.append('aborted')
2890 flags_str = ','.join(flags)
2891 if flags_str:
2892 flags_str = ' [%s]' % flags_str
2893 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2894 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002895
2896
jadmanski0afbb632008-06-06 21:10:57 +00002897 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002898 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002899
showard56824072009-10-12 20:30:21 +00002900 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002901
showard8cc058f2009-09-08 16:26:33 +00002902 if status in (models.HostQueueEntry.Status.QUEUED,
2903 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002904 self.update_field('complete', False)
2905 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002906
showard8cc058f2009-09-08 16:26:33 +00002907 if status in (models.HostQueueEntry.Status.PENDING,
2908 models.HostQueueEntry.Status.RUNNING,
2909 models.HostQueueEntry.Status.VERIFYING,
2910 models.HostQueueEntry.Status.STARTING,
2911 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002912 self.update_field('complete', False)
2913 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002914
showard8cc058f2009-09-08 16:26:33 +00002915 if status in (models.HostQueueEntry.Status.FAILED,
2916 models.HostQueueEntry.Status.COMPLETED,
2917 models.HostQueueEntry.Status.STOPPED,
2918 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002919 self.update_field('complete', True)
2920 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002921 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002922
2923 should_email_status = (status.lower() in _notify_email_statuses or
2924 'all' in _notify_email_statuses)
2925 if should_email_status:
2926 self._email_on_status(status)
2927
2928 self._email_on_job_complete()
2929
2930
showardf85a0b72009-10-07 20:48:45 +00002931 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00002932 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00002933 if not self.execution_subdir:
2934 return
2935 # unregister any possible pidfiles associated with this queue entry
2936 for pidfile_name in _ALL_PIDFILE_NAMES:
2937 pidfile_id = _drone_manager.get_pidfile_id_from(
2938 self.execution_path(), pidfile_name=pidfile_name)
2939 _drone_manager.unregister_pidfile(pidfile_id)
2940
2941
showardc85c21b2008-11-24 22:17:37 +00002942 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002943 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002944
2945 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2946 self.job.id, self.job.name, hostname, status)
2947 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2948 self.job.id, self.job.name, hostname, status,
2949 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002950 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002951
2952
2953 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002954 if not self.job.is_finished():
2955 return
showard542e8402008-09-19 20:16:18 +00002956
showardc85c21b2008-11-24 22:17:37 +00002957 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002958 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002959 for queue_entry in hosts_queue:
2960 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002961 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002962 queue_entry.status))
2963
2964 summary_text = "\n".join(summary_text)
2965 status_counts = models.Job.objects.get_status_counts(
2966 [self.job.id])[self.job.id]
2967 status = ', '.join('%d %s' % (count, status) for status, count
2968 in status_counts.iteritems())
2969
2970 subject = 'Autotest: Job ID: %s "%s" %s' % (
2971 self.job.id, self.job.name, status)
2972 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2973 self.job.id, self.job.name, status, self._view_job_url(),
2974 summary_text)
showard170873e2009-01-07 00:22:26 +00002975 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002976
2977
showard8cc058f2009-09-08 16:26:33 +00002978 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002979 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002980 assert assigned_host
2981 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002982 if self.host_id is None:
2983 self.set_host(assigned_host)
2984 else:
2985 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002986
showard2ca64c92009-12-10 21:41:02 +00002987 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002988 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00002989 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002990
showard8cc058f2009-09-08 16:26:33 +00002991 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002992
2993
showard8cc058f2009-09-08 16:26:33 +00002994 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002995 # Every host goes thru the Verifying stage (which may or may not
2996 # actually do anything as determined by get_pre_job_tasks).
2997 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002998 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002999
showard6ae5ea92009-02-25 00:11:51 +00003000
jadmanski0afbb632008-06-06 21:10:57 +00003001 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00003002 assert self.host
showard8cc058f2009-09-08 16:26:33 +00003003 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00003004 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00003005 # verify/cleanup failure sets the execution subdir, so reset it here
3006 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00003007 if self.meta_host:
3008 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00003009
3010
jadmanskif7fa2cc2008-10-01 14:13:23 +00003011 @property
3012 def aborted_by(self):
3013 self._load_abort_info()
3014 return self._aborted_by
3015
3016
3017 @property
3018 def aborted_on(self):
3019 self._load_abort_info()
3020 return self._aborted_on
3021
3022
3023 def _load_abort_info(self):
3024 """ Fetch info about who aborted the job. """
3025 if hasattr(self, "_aborted_by"):
3026 return
3027 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00003028 SELECT afe_users.login,
3029 afe_aborted_host_queue_entries.aborted_on
3030 FROM afe_aborted_host_queue_entries
3031 INNER JOIN afe_users
3032 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
3033 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
jadmanskif7fa2cc2008-10-01 14:13:23 +00003034 """, (self.id,))
3035 if rows:
3036 self._aborted_by, self._aborted_on = rows[0]
3037 else:
3038 self._aborted_by = self._aborted_on = None
3039
3040
showardb2e2c322008-10-14 17:33:55 +00003041 def on_pending(self):
3042 """
3043 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00003044 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
3045 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00003046 """
showard8cc058f2009-09-08 16:26:33 +00003047 self.set_status(models.HostQueueEntry.Status.PENDING)
3048 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00003049
3050 # Some debug code here: sends an email if an asynchronous job does not
3051 # immediately enter Starting.
3052 # TODO: Remove this once we figure out why asynchronous jobs are getting
3053 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00003054 self.job.run_if_ready(queue_entry=self)
3055 if (self.job.synch_count == 1 and
3056 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00003057 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
3058 message = 'Asynchronous job stuck in Pending'
3059 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00003060
3061
showardd3dc1992009-04-22 21:01:40 +00003062 def abort(self, dispatcher):
3063 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00003064
showardd3dc1992009-04-22 21:01:40 +00003065 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00003066 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00003067 # do nothing; post-job tasks will finish and then mark this entry
3068 # with status "Aborted" and take care of the host
3069 return
3070
showard8cc058f2009-09-08 16:26:33 +00003071 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
3072 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00003073 self.host.set_status(models.Host.Status.READY)
3074 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00003075 models.SpecialTask.objects.create(
3076 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00003077 host=models.Host.objects.get(id=self.host.id),
3078 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00003079
3080 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00003081 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00003082
showard8cc058f2009-09-08 16:26:33 +00003083
3084 def get_group_name(self):
3085 atomic_group = self.atomic_group
3086 if not atomic_group:
3087 return ''
3088
3089 # Look at any meta_host and dependency labels and pick the first
3090 # one that also specifies this atomic group. Use that label name
3091 # as the group name if possible (it is more specific).
3092 for label in self.get_labels():
3093 if label.atomic_group_id:
3094 assert label.atomic_group_id == atomic_group.id
3095 return label.name
3096 return atomic_group.name
3097
3098
showard170873e2009-01-07 00:22:26 +00003099 def execution_tag(self):
3100 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003101 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003102
3103
showarded2afea2009-07-07 20:54:07 +00003104 def execution_path(self):
3105 return self.execution_tag()
3106
3107
showarda9545c02009-12-18 22:44:26 +00003108 def set_started_on_now(self):
3109 self.update_field('started_on', datetime.datetime.now())
3110
3111
3112 def is_hostless(self):
3113 return (self.host_id is None
3114 and self.meta_host is None
3115 and self.atomic_group_id is None)
3116
3117
mbligh36768f02008-02-22 18:28:33 +00003118class Job(DBObject):
showardeab66ce2009-12-23 00:03:56 +00003119 _table_name = 'afe_jobs'
showard6ae5ea92009-02-25 00:11:51 +00003120 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3121 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003122 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003123 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003124
showard77182562009-06-10 00:16:05 +00003125 # This does not need to be a column in the DB. The delays are likely to
3126 # be configured short. If the scheduler is stopped and restarted in
3127 # the middle of a job's delay cycle, the delay cycle will either be
3128 # repeated or skipped depending on the number of Pending machines found
3129 # when the restarted scheduler recovers to track it. Not a problem.
3130 #
3131 # A reference to the DelayedCallTask that will wake up the job should
3132 # no other HQEs change state in time. Its end_time attribute is used
3133 # by our run_with_ready_delay() method to determine if the wait is over.
3134 _delay_ready_task = None
3135
3136 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3137 # all status='Pending' atomic group HQEs incase a delay was running when the
3138 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003139
showarda3c58572009-03-12 20:36:59 +00003140 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003141 assert id or row
showarda3c58572009-03-12 20:36:59 +00003142 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003143 self._owner_model = None # caches model instance of owner
3144
3145
3146 def owner_model(self):
3147 # work around the fact that the Job owner field is a string, not a
3148 # foreign key
3149 if not self._owner_model:
3150 self._owner_model = models.User.objects.get(login=self.owner)
3151 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003152
mblighe2586682008-02-29 22:45:46 +00003153
jadmanski0afbb632008-06-06 21:10:57 +00003154 def is_server_job(self):
3155 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003156
3157
showard170873e2009-01-07 00:22:26 +00003158 def tag(self):
3159 return "%s-%s" % (self.id, self.owner)
3160
3161
jadmanski0afbb632008-06-06 21:10:57 +00003162 def get_host_queue_entries(self):
3163 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00003164 SELECT * FROM afe_host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +00003165 WHERE job_id= %s
3166 """, (self.id,))
3167 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003168
jadmanski0afbb632008-06-06 21:10:57 +00003169 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003170
jadmanski0afbb632008-06-06 21:10:57 +00003171 return entries
mbligh36768f02008-02-22 18:28:33 +00003172
3173
jadmanski0afbb632008-06-06 21:10:57 +00003174 def set_status(self, status, update_queues=False):
3175 self.update_field('status',status)
3176
3177 if update_queues:
3178 for queue_entry in self.get_host_queue_entries():
3179 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003180
3181
showard77182562009-06-10 00:16:05 +00003182 def _atomic_and_has_started(self):
3183 """
3184 @returns True if any of the HostQueueEntries associated with this job
3185 have entered the Status.STARTING state or beyond.
3186 """
3187 atomic_entries = models.HostQueueEntry.objects.filter(
3188 job=self.id, atomic_group__isnull=False)
3189 if atomic_entries.count() <= 0:
3190 return False
3191
showardaf8b4ca2009-06-16 18:47:26 +00003192 # These states may *only* be reached if Job.run() has been called.
3193 started_statuses = (models.HostQueueEntry.Status.STARTING,
3194 models.HostQueueEntry.Status.RUNNING,
3195 models.HostQueueEntry.Status.COMPLETED)
3196
3197 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003198 return started_entries.count() > 0
3199
3200
showard708b3522009-08-20 23:26:15 +00003201 def _hosts_assigned_count(self):
3202 """The number of HostQueueEntries assigned a Host for this job."""
3203 entries = models.HostQueueEntry.objects.filter(job=self.id,
3204 host__isnull=False)
3205 return entries.count()
3206
3207
showard77182562009-06-10 00:16:05 +00003208 def _pending_count(self):
3209 """The number of HostQueueEntries for this job in the Pending state."""
3210 pending_entries = models.HostQueueEntry.objects.filter(
3211 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3212 return pending_entries.count()
3213
3214
showardd07a5f32009-12-07 19:36:20 +00003215 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003216 """
3217 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003218 are using to set an upper bound on the threshold.
3219 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003220 this job can run.
3221 """
3222 return min(self._hosts_assigned_count(),
3223 atomic_group.max_number_of_machines)
3224
3225
showardd07a5f32009-12-07 19:36:20 +00003226 def _min_hosts_needed_to_run(self):
3227 """Return the minumum number of hsots needed to run this job."""
3228 return self.synch_count
3229
3230
jadmanski0afbb632008-06-06 21:10:57 +00003231 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003232 # NOTE: Atomic group jobs stop reporting ready after they have been
3233 # started to avoid launching multiple copies of one atomic job.
3234 # Only possible if synch_count is less than than half the number of
3235 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003236 pending_count = self._pending_count()
3237 atomic_and_has_started = self._atomic_and_has_started()
3238 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003239 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003240
3241 if not ready:
3242 logging.info(
3243 'Job %s not ready: %s pending, %s required '
3244 '(Atomic and started: %s)',
3245 self, pending_count, self.synch_count,
3246 atomic_and_has_started)
3247
3248 return ready
mbligh36768f02008-02-22 18:28:33 +00003249
3250
jadmanski0afbb632008-06-06 21:10:57 +00003251 def num_machines(self, clause = None):
3252 sql = "job_id=%s" % self.id
3253 if clause:
3254 sql += " AND (%s)" % clause
showardeab66ce2009-12-23 00:03:56 +00003255 return self.count(sql, table='afe_host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003256
3257
jadmanski0afbb632008-06-06 21:10:57 +00003258 def num_queued(self):
3259 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003260
3261
jadmanski0afbb632008-06-06 21:10:57 +00003262 def num_active(self):
3263 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003264
3265
jadmanski0afbb632008-06-06 21:10:57 +00003266 def num_complete(self):
3267 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003268
3269
jadmanski0afbb632008-06-06 21:10:57 +00003270 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003271 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003272
mbligh36768f02008-02-22 18:28:33 +00003273
showard6bb7c292009-01-30 01:44:51 +00003274 def _not_yet_run_entries(self, include_verifying=True):
3275 statuses = [models.HostQueueEntry.Status.QUEUED,
3276 models.HostQueueEntry.Status.PENDING]
3277 if include_verifying:
3278 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3279 return models.HostQueueEntry.objects.filter(job=self.id,
3280 status__in=statuses)
3281
3282
3283 def _stop_all_entries(self):
3284 entries_to_stop = self._not_yet_run_entries(
3285 include_verifying=False)
3286 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003287 assert not child_entry.complete, (
3288 '%s status=%s, active=%s, complete=%s' %
3289 (child_entry.id, child_entry.status, child_entry.active,
3290 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003291 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3292 child_entry.host.status = models.Host.Status.READY
3293 child_entry.host.save()
3294 child_entry.status = models.HostQueueEntry.Status.STOPPED
3295 child_entry.save()
3296
showard2bab8f42008-11-12 18:15:22 +00003297 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003298 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003299 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003300 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003301
3302
jadmanski0afbb632008-06-06 21:10:57 +00003303 def write_to_machines_file(self, queue_entry):
showarda9545c02009-12-18 22:44:26 +00003304 hostname = queue_entry.host.hostname
showard170873e2009-01-07 00:22:26 +00003305 file_path = os.path.join(self.tag(), '.machines')
3306 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003307
3308
showardf1ae3542009-05-11 19:26:02 +00003309 def _next_group_name(self, group_name=''):
3310 """@returns a directory name to use for the next host group results."""
3311 if group_name:
3312 # Sanitize for use as a pathname.
3313 group_name = group_name.replace(os.path.sep, '_')
3314 if group_name.startswith('.'):
3315 group_name = '_' + group_name[1:]
3316 # Add a separator between the group name and 'group%d'.
3317 group_name += '.'
3318 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003319 query = models.HostQueueEntry.objects.filter(
3320 job=self.id).values('execution_subdir').distinct()
3321 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003322 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3323 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003324 if ids:
3325 next_id = max(ids) + 1
3326 else:
3327 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003328 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003329
3330
showarddb502762009-09-09 15:31:20 +00003331 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003332 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003333 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003334 return control_path
mbligh36768f02008-02-22 18:28:33 +00003335
showardb2e2c322008-10-14 17:33:55 +00003336
showard2bab8f42008-11-12 18:15:22 +00003337 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003338 """
3339 @param queue_entry_from_group: A HostQueueEntry instance to find other
3340 group entries on this job for.
3341
3342 @returns A list of HostQueueEntry objects all executing this job as
3343 part of the same group as the one supplied (having the same
3344 execution_subdir).
3345 """
showard2bab8f42008-11-12 18:15:22 +00003346 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003347 return list(HostQueueEntry.fetch(
3348 where='job_id=%s AND execution_subdir=%s',
3349 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003350
3351
showard8cc058f2009-09-08 16:26:33 +00003352 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003353 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003354 execution_path = queue_entries[0].execution_path()
3355 control_path = self._write_control_file(execution_path)
showarda9545c02009-12-18 22:44:26 +00003356 hostnames = ','.join(entry.host.hostname
3357 for entry in queue_entries
3358 if not entry.is_hostless())
mbligh36768f02008-02-22 18:28:33 +00003359
showarddb502762009-09-09 15:31:20 +00003360 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003361 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003362 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003363 ['-P', execution_tag, '-n',
3364 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003365 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003366
jadmanski0afbb632008-06-06 21:10:57 +00003367 if not self.is_server_job():
3368 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003369
showardb2e2c322008-10-14 17:33:55 +00003370 return params
mblighe2586682008-02-29 22:45:46 +00003371
mbligh36768f02008-02-22 18:28:33 +00003372
showardc9ae1782009-01-30 01:42:37 +00003373 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003374 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003375 return True
showard0fc38302008-10-23 00:44:07 +00003376 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showarda9545c02009-12-18 22:44:26 +00003377 return queue_entry.host.dirty
showardc9ae1782009-01-30 01:42:37 +00003378 return False
showard21baa452008-10-21 00:08:39 +00003379
showardc9ae1782009-01-30 01:42:37 +00003380
showard8cc058f2009-09-08 16:26:33 +00003381 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003382 do_not_verify = (queue_entry.host.protection ==
3383 host_protections.Protection.DO_NOT_VERIFY)
3384 if do_not_verify:
3385 return False
3386 return self.run_verify
3387
3388
showard8cc058f2009-09-08 16:26:33 +00003389 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003390 """
3391 Get a list of tasks to perform before the host_queue_entry
3392 may be used to run this Job (such as Cleanup & Verify).
3393
3394 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003395 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003396 task in the list calls HostQueueEntry.on_pending(), which
3397 continues the flow of the job.
3398 """
showardc9ae1782009-01-30 01:42:37 +00003399 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003400 task = models.SpecialTask.Task.CLEANUP
3401 elif self._should_run_verify(queue_entry):
3402 task = models.SpecialTask.Task.VERIFY
3403 else:
3404 queue_entry.on_pending()
3405 return
3406
showard9bb960b2009-11-19 01:02:11 +00003407 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003408 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003409 host=models.Host.objects.get(id=queue_entry.host_id),
3410 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003411
3412
showardf1ae3542009-05-11 19:26:02 +00003413 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003414 if len(queue_entries) == 1:
showarda9545c02009-12-18 22:44:26 +00003415 group_subdir_name = queue_entries[0].host.hostname
showard2bab8f42008-11-12 18:15:22 +00003416 else:
showardf1ae3542009-05-11 19:26:02 +00003417 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003418 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003419 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003420 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003421
3422 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003423 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003424
3425
3426 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003427 """
3428 @returns A tuple containing a list of HostQueueEntry instances to be
3429 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003430 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003431 """
showard77182562009-06-10 00:16:05 +00003432 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003433 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003434 if atomic_group:
3435 num_entries_wanted = atomic_group.max_number_of_machines
3436 else:
3437 num_entries_wanted = self.synch_count
3438 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003439
showardf1ae3542009-05-11 19:26:02 +00003440 if num_entries_wanted > 0:
3441 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003442 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003443 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003444 params=(self.id, include_queue_entry.id)))
3445
3446 # Sort the chosen hosts by hostname before slicing.
3447 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3448 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3449 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3450 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003451
showardf1ae3542009-05-11 19:26:02 +00003452 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003453 if len(chosen_entries) < self.synch_count:
3454 message = ('job %s got less than %s chosen entries: %s' % (
3455 self.id, self.synch_count, chosen_entries))
3456 logging.error(message)
3457 email_manager.manager.enqueue_notify_email(
3458 'Job not started, too few chosen entries', message)
3459 return []
showardf1ae3542009-05-11 19:26:02 +00003460
showard8cc058f2009-09-08 16:26:33 +00003461 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003462
3463 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003464 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003465
3466
showard77182562009-06-10 00:16:05 +00003467 def run_if_ready(self, queue_entry):
3468 """
showard8375ce02009-10-12 20:35:13 +00003469 Run this job by kicking its HQEs into status='Starting' if enough
3470 hosts are ready for it to run.
3471
3472 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3473 ready to run.
showard77182562009-06-10 00:16:05 +00003474 """
showardb2e2c322008-10-14 17:33:55 +00003475 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003476 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003477 elif queue_entry.atomic_group:
3478 self.run_with_ready_delay(queue_entry)
3479 else:
3480 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003481
3482
3483 def run_with_ready_delay(self, queue_entry):
3484 """
3485 Start a delay to wait for more hosts to enter Pending state before
3486 launching an atomic group job. Once set, the a delay cannot be reset.
3487
3488 @param queue_entry: The HostQueueEntry object to get atomic group
3489 info from and pass to run_if_ready when the delay is up.
3490
3491 @returns An Agent to run the job as appropriate or None if a delay
3492 has already been set.
3493 """
3494 assert queue_entry.job_id == self.id
3495 assert queue_entry.atomic_group
3496 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003497 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003498 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003499 delay_expired = (self._delay_ready_task and
3500 time.time() >= self._delay_ready_task.end_time)
3501
3502 # Delay is disabled or we already have enough? Do not wait to run.
3503 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003504 self.run(queue_entry)
3505 else:
3506 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003507
showard8cc058f2009-09-08 16:26:33 +00003508
showardd07a5f32009-12-07 19:36:20 +00003509 def request_abort(self):
3510 """Request that this Job be aborted on the next scheduler cycle."""
3511 queue_entries = HostQueueEntry.fetch(where="job_id=%s" % self.id)
3512 for hqe in queue_entries:
3513 hqe.update_field('aborted', True)
3514
3515
showard8cc058f2009-09-08 16:26:33 +00003516 def schedule_delayed_callback_task(self, queue_entry):
3517 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3518
showard77182562009-06-10 00:16:05 +00003519 if self._delay_ready_task:
3520 return None
3521
showard8cc058f2009-09-08 16:26:33 +00003522 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3523
showard77182562009-06-10 00:16:05 +00003524 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003525 logging.info('Job %s done waiting for extra hosts.', self)
3526 # Check to see if the job is still relevant. It could have aborted
3527 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003528 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003529 logging.info('Job %s had too few Pending hosts after waiting '
3530 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003531 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003532 return
showard77182562009-06-10 00:16:05 +00003533 return self.run(queue_entry)
3534
showard708b3522009-08-20 23:26:15 +00003535 logging.info('Job %s waiting up to %s seconds for more hosts.',
3536 self.id, delay)
showard77182562009-06-10 00:16:05 +00003537 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3538 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003539 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003540
3541
3542 def run(self, queue_entry):
3543 """
3544 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003545 """
3546 if queue_entry.atomic_group and self._atomic_and_has_started():
3547 logging.error('Job.run() called on running atomic Job %d '
3548 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003549 return
3550 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003551 if queue_entries:
3552 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003553
3554
showard8cc058f2009-09-08 16:26:33 +00003555 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003556 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003557 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003558 self.abort_delay_ready_task()
3559
3560
3561 def abort_delay_ready_task(self):
3562 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003563 if self._delay_ready_task:
3564 # Cancel any pending callback that would try to run again
3565 # as we are already running.
3566 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003567
showardd2014822009-10-12 20:26:58 +00003568
showardb000a8d2009-07-28 20:02:07 +00003569 def __str__(self):
3570 return '%s-%s' % (self.id, self.owner)
3571
3572
mbligh36768f02008-02-22 18:28:33 +00003573if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003574 main()