blob: eb21a56273e3f58acd1f7be3f2ca7a9ad245ffcd [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000025from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
showardf13a9e22009-12-18 22:54:09 +000027from autotest_lib.scheduler import gc_stats
mbligh70feeee2008-06-11 16:20:49 +000028
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showardd3dc1992009-04-22 21:01:40 +000045_AUTOSERV_PID_FILE = '.autoserv_execute'
46_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
47_PARSER_PID_FILE = '.parser_execute'
mbligh4608b002010-01-05 18:22:35 +000048_ARCHIVER_PID_FILE = '.archiver_execute'
showardd3dc1992009-04-22 21:01:40 +000049
50_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
mbligh4608b002010-01-05 18:22:35 +000051 _PARSER_PID_FILE, _ARCHIVER_PID_FILE)
showardd3dc1992009-04-22 21:01:40 +000052
showard35162b02009-03-03 02:17:30 +000053# error message to leave in results dir when an autoserv process disappears
54# mysteriously
55_LOST_PROCESS_ERROR = """\
56Autoserv failed abnormally during execution for this job, probably due to a
57system error on the Autotest server. Full results may not be available. Sorry.
58"""
59
mbligh6f8bab42008-02-29 22:45:14 +000060_db = None
mbligh36768f02008-02-22 18:28:33 +000061_shutdown = False
showard170873e2009-01-07 00:22:26 +000062_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
63_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000064_testing_mode = False
showard542e8402008-09-19 20:16:18 +000065_base_url = None
showardc85c21b2008-11-24 22:17:37 +000066_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000067_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000068
69
showardec6a3b92009-09-25 20:29:13 +000070def _get_pidfile_timeout_secs():
71 """@returns How long to wait for autoserv to write pidfile."""
72 pidfile_timeout_mins = global_config.global_config.get_config_value(
73 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
74 return pidfile_timeout_mins * 60
75
76
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
mbligh36768f02008-02-22 18:28:33 +000081def main():
showard27f33872009-04-07 18:20:53 +000082 try:
showard549afad2009-08-20 23:33:36 +000083 try:
84 main_without_exception_handling()
85 except SystemExit:
86 raise
87 except:
88 logging.exception('Exception escaping in monitor_db')
89 raise
90 finally:
91 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000092
93
94def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000095 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000096
showard136e6dc2009-06-10 19:38:49 +000097 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000098 parser = optparse.OptionParser(usage)
99 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
100 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000101 parser.add_option('--test', help='Indicate that scheduler is under ' +
102 'test and should use dummy autoserv and no parsing',
103 action='store_true')
104 (options, args) = parser.parse_args()
105 if len(args) != 1:
106 parser.print_usage()
107 return
mbligh36768f02008-02-22 18:28:33 +0000108
showard5613c662009-06-08 23:30:33 +0000109 scheduler_enabled = global_config.global_config.get_config_value(
110 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
111
112 if not scheduler_enabled:
113 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
114 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000115 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000116 sys.exit(1)
117
jadmanski0afbb632008-06-06 21:10:57 +0000118 global RESULTS_DIR
119 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000120
mbligh83c1e9e2009-05-01 23:10:41 +0000121 site_init = utils.import_site_function(__file__,
122 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
123 _site_init_monitor_db_dummy)
124 site_init()
125
showardcca334f2009-03-12 20:38:34 +0000126 # Change the cwd while running to avoid issues incase we were launched from
127 # somewhere odd (such as a random NFS home directory of the person running
128 # sudo to launch us as the appropriate user).
129 os.chdir(RESULTS_DIR)
130
jadmanski0afbb632008-06-06 21:10:57 +0000131 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000132 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
133 "notify_email_statuses",
134 default='')
showardc85c21b2008-11-24 22:17:37 +0000135 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000136 _notify_email_statuses = [status for status in
137 re.split(r'[\s,;:]', notify_statuses_list.lower())
138 if status]
showardc85c21b2008-11-24 22:17:37 +0000139
jadmanski0afbb632008-06-06 21:10:57 +0000140 if options.test:
141 global _autoserv_path
142 _autoserv_path = 'autoserv_dummy'
143 global _testing_mode
144 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000145
mbligh37eceaa2008-12-15 22:56:37 +0000146 # AUTOTEST_WEB.base_url is still a supported config option as some people
147 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000148 global _base_url
showard170873e2009-01-07 00:22:26 +0000149 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
150 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000151 if config_base_url:
152 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000153 else:
mbligh37eceaa2008-12-15 22:56:37 +0000154 # For the common case of everything running on a single server you
155 # can just set the hostname in a single place in the config file.
156 server_name = c.get_config_value('SERVER', 'hostname')
157 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000158 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000159 sys.exit(1)
160 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000161
showardc5afc462009-01-13 00:09:39 +0000162 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000163 server.start()
164
jadmanski0afbb632008-06-06 21:10:57 +0000165 try:
showard136e6dc2009-06-10 19:38:49 +0000166 init()
showardc5afc462009-01-13 00:09:39 +0000167 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000168 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000169
jadmanski0afbb632008-06-06 21:10:57 +0000170 while not _shutdown:
171 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000172 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000173 except:
showard170873e2009-01-07 00:22:26 +0000174 email_manager.manager.log_stacktrace(
175 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000176
showard170873e2009-01-07 00:22:26 +0000177 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000178 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000179 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000180 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000181
182
showard136e6dc2009-06-10 19:38:49 +0000183def setup_logging():
184 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
185 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
186 logging_manager.configure_logging(
187 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
188 logfile_name=log_name)
189
190
mbligh36768f02008-02-22 18:28:33 +0000191def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000192 global _shutdown
193 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000194 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000195
196
showard136e6dc2009-06-10 19:38:49 +0000197def init():
showardb18134f2009-03-20 20:52:18 +0000198 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
199 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000200
showard8de37132009-08-31 18:33:08 +0000201 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000202 logging.critical("monitor_db already running, aborting!")
203 sys.exit(1)
204 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000205
showardb1e51872008-10-07 11:08:18 +0000206 if _testing_mode:
207 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000208 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000209
jadmanski0afbb632008-06-06 21:10:57 +0000210 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
211 global _db
showard170873e2009-01-07 00:22:26 +0000212 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000213 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000214
showardfa8629c2008-11-04 16:51:23 +0000215 # ensure Django connection is in autocommit
216 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000217 # bypass the readonly connection
218 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000221 signal.signal(signal.SIGINT, handle_sigint)
222
showardd1ee1dd2009-01-07 21:33:08 +0000223 drones = global_config.global_config.get_config_value(
224 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
225 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000226 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000227 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000228 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
229
showardb18134f2009-03-20 20:52:18 +0000230 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000231
232
showarded2afea2009-07-07 20:54:07 +0000233def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
234 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000235 """
236 @returns The autoserv command line as a list of executable + parameters.
237
238 @param machines - string - A machine or comma separated list of machines
239 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000240 @param extra_args - list - Additional arguments to pass to autoserv.
241 @param job - Job object - If supplied, -u owner and -l name parameters
242 will be added.
243 @param queue_entry - A HostQueueEntry object - If supplied and no Job
244 object was supplied, this will be used to lookup the Job object.
245 """
showarda9545c02009-12-18 22:44:26 +0000246 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000247 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000248 if machines:
249 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000250 if job or queue_entry:
251 if not job:
252 job = queue_entry.job
253 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000254 if verbose:
255 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000256 return autoserv_argv + extra_args
257
258
showard89f84db2009-03-12 20:39:13 +0000259class SchedulerError(Exception):
260 """Raised by HostScheduler when an inconsistent state occurs."""
261
262
showard63a34772008-08-18 19:32:50 +0000263class HostScheduler(object):
264 def _get_ready_hosts(self):
265 # avoid any host with a currently active queue entry against it
266 hosts = Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000267 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
268 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000269 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000270 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000271 "AND NOT afe_hosts.locked "
272 "AND (afe_hosts.status IS NULL "
273 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000274 return dict((host.id, host) for host in hosts)
275
276
277 @staticmethod
278 def _get_sql_id_list(id_list):
279 return ','.join(str(item_id) for item_id in id_list)
280
281
282 @classmethod
showard989f25d2008-10-01 11:38:11 +0000283 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000284 if not id_list:
285 return {}
showard63a34772008-08-18 19:32:50 +0000286 query %= cls._get_sql_id_list(id_list)
287 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000288 return cls._process_many2many_dict(rows, flip)
289
290
291 @staticmethod
292 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000293 result = {}
294 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000295 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000296 if flip:
297 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000298 result.setdefault(left_id, set()).add(right_id)
299 return result
300
301
302 @classmethod
303 def _get_job_acl_groups(cls, job_ids):
304 query = """
showardeab66ce2009-12-23 00:03:56 +0000305 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
306 FROM afe_jobs
307 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
308 INNER JOIN afe_acl_groups_users ON
309 afe_acl_groups_users.user_id = afe_users.id
310 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000311 """
312 return cls._get_many2many_dict(query, job_ids)
313
314
315 @classmethod
316 def _get_job_ineligible_hosts(cls, job_ids):
317 query = """
318 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000319 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000320 WHERE job_id IN (%s)
321 """
322 return cls._get_many2many_dict(query, job_ids)
323
324
325 @classmethod
showard989f25d2008-10-01 11:38:11 +0000326 def _get_job_dependencies(cls, job_ids):
327 query = """
328 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000329 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000330 WHERE job_id IN (%s)
331 """
332 return cls._get_many2many_dict(query, job_ids)
333
334
335 @classmethod
showard63a34772008-08-18 19:32:50 +0000336 def _get_host_acls(cls, host_ids):
337 query = """
showardd9ac4452009-02-07 02:04:37 +0000338 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000339 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000340 WHERE host_id IN (%s)
341 """
342 return cls._get_many2many_dict(query, host_ids)
343
344
345 @classmethod
346 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000347 if not host_ids:
348 return {}, {}
showard63a34772008-08-18 19:32:50 +0000349 query = """
350 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000351 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000352 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000353 """ % cls._get_sql_id_list(host_ids)
354 rows = _db.execute(query)
355 labels_to_hosts = cls._process_many2many_dict(rows)
356 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
357 return labels_to_hosts, hosts_to_labels
358
359
360 @classmethod
361 def _get_labels(cls):
362 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000363
364
365 def refresh(self, pending_queue_entries):
366 self._hosts_available = self._get_ready_hosts()
367
368 relevant_jobs = [queue_entry.job_id
369 for queue_entry in pending_queue_entries]
370 self._job_acls = self._get_job_acl_groups(relevant_jobs)
371 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000372 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000373
374 host_ids = self._hosts_available.keys()
375 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000376 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
377
378 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000379
380
381 def _is_acl_accessible(self, host_id, queue_entry):
382 job_acls = self._job_acls.get(queue_entry.job_id, set())
383 host_acls = self._host_acls.get(host_id, set())
384 return len(host_acls.intersection(job_acls)) > 0
385
386
showard989f25d2008-10-01 11:38:11 +0000387 def _check_job_dependencies(self, job_dependencies, host_labels):
388 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000389 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000390
391
392 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
393 queue_entry):
showardade14e22009-01-26 22:38:32 +0000394 if not queue_entry.meta_host:
395 # bypass only_if_needed labels when a specific host is selected
396 return True
397
showard989f25d2008-10-01 11:38:11 +0000398 for label_id in host_labels:
399 label = self._labels[label_id]
400 if not label.only_if_needed:
401 # we don't care about non-only_if_needed labels
402 continue
403 if queue_entry.meta_host == label_id:
404 # if the label was requested in a metahost it's OK
405 continue
406 if label_id not in job_dependencies:
407 return False
408 return True
409
410
showard89f84db2009-03-12 20:39:13 +0000411 def _check_atomic_group_labels(self, host_labels, queue_entry):
412 """
413 Determine if the given HostQueueEntry's atomic group settings are okay
414 to schedule on a host with the given labels.
415
showard6157c632009-07-06 20:19:31 +0000416 @param host_labels: A list of label ids that the host has.
417 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000418
419 @returns True if atomic group settings are okay, False otherwise.
420 """
showard6157c632009-07-06 20:19:31 +0000421 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000422 queue_entry.atomic_group_id)
423
424
showard6157c632009-07-06 20:19:31 +0000425 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000426 """
427 Return the atomic group label id for a host with the given set of
428 labels if any, or None otherwise. Raises an exception if more than
429 one atomic group are found in the set of labels.
430
showard6157c632009-07-06 20:19:31 +0000431 @param host_labels: A list of label ids that the host has.
432 @param queue_entry: The HostQueueEntry we're testing. Only used for
433 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000434
435 @returns The id of the atomic group found on a label in host_labels
436 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000437 """
showard6157c632009-07-06 20:19:31 +0000438 atomic_labels = [self._labels[label_id] for label_id in host_labels
439 if self._labels[label_id].atomic_group_id is not None]
440 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000441 if not atomic_ids:
442 return None
443 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000444 logging.error('More than one Atomic Group on HQE "%s" via: %r',
445 queue_entry, atomic_labels)
446 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000447
448
449 def _get_atomic_group_labels(self, atomic_group_id):
450 """
451 Lookup the label ids that an atomic_group is associated with.
452
453 @param atomic_group_id - The id of the AtomicGroup to look up.
454
455 @returns A generator yeilding Label ids for this atomic group.
456 """
457 return (id for id, label in self._labels.iteritems()
458 if label.atomic_group_id == atomic_group_id
459 and not label.invalid)
460
461
showard54c1ea92009-05-20 00:32:58 +0000462 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000463 """
464 @param group_hosts - A sequence of Host ids to test for usability
465 and eligibility against the Job associated with queue_entry.
466 @param queue_entry - The HostQueueEntry that these hosts are being
467 tested for eligibility against.
468
469 @returns A subset of group_hosts Host ids that are eligible for the
470 supplied queue_entry.
471 """
472 return set(host_id for host_id in group_hosts
473 if self._is_host_usable(host_id)
474 and self._is_host_eligible_for_job(host_id, queue_entry))
475
476
showard989f25d2008-10-01 11:38:11 +0000477 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000478 if self._is_host_invalid(host_id):
479 # if an invalid host is scheduled for a job, it's a one-time host
480 # and it therefore bypasses eligibility checks. note this can only
481 # happen for non-metahosts, because invalid hosts have their label
482 # relationships cleared.
483 return True
484
showard989f25d2008-10-01 11:38:11 +0000485 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
486 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000487
showard89f84db2009-03-12 20:39:13 +0000488 return (self._is_acl_accessible(host_id, queue_entry) and
489 self._check_job_dependencies(job_dependencies, host_labels) and
490 self._check_only_if_needed_labels(
491 job_dependencies, host_labels, queue_entry) and
492 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000493
494
showard2924b0a2009-06-18 23:16:15 +0000495 def _is_host_invalid(self, host_id):
496 host_object = self._hosts_available.get(host_id, None)
497 return host_object and host_object.invalid
498
499
showard63a34772008-08-18 19:32:50 +0000500 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000501 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000502 return None
503 return self._hosts_available.pop(queue_entry.host_id, None)
504
505
506 def _is_host_usable(self, host_id):
507 if host_id not in self._hosts_available:
508 # host was already used during this scheduling cycle
509 return False
510 if self._hosts_available[host_id].invalid:
511 # Invalid hosts cannot be used for metahosts. They're included in
512 # the original query because they can be used by non-metahosts.
513 return False
514 return True
515
516
517 def _schedule_metahost(self, queue_entry):
518 label_id = queue_entry.meta_host
519 hosts_in_label = self._label_hosts.get(label_id, set())
520 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
521 set())
522
523 # must iterate over a copy so we can mutate the original while iterating
524 for host_id in list(hosts_in_label):
525 if not self._is_host_usable(host_id):
526 hosts_in_label.remove(host_id)
527 continue
528 if host_id in ineligible_host_ids:
529 continue
showard989f25d2008-10-01 11:38:11 +0000530 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000531 continue
532
showard89f84db2009-03-12 20:39:13 +0000533 # Remove the host from our cached internal state before returning
534 # the host object.
showard63a34772008-08-18 19:32:50 +0000535 hosts_in_label.remove(host_id)
536 return self._hosts_available.pop(host_id)
537 return None
538
539
540 def find_eligible_host(self, queue_entry):
541 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000542 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000543 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000544 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000545 return self._schedule_metahost(queue_entry)
546
547
showard89f84db2009-03-12 20:39:13 +0000548 def find_eligible_atomic_group(self, queue_entry):
549 """
550 Given an atomic group host queue entry, locate an appropriate group
551 of hosts for the associated job to run on.
552
553 The caller is responsible for creating new HQEs for the additional
554 hosts returned in order to run the actual job on them.
555
556 @returns A list of Host instances in a ready state to satisfy this
557 atomic group scheduling. Hosts will all belong to the same
558 atomic group label as specified by the queue_entry.
559 An empty list will be returned if no suitable atomic
560 group could be found.
561
562 TODO(gps): what is responsible for kicking off any attempted repairs on
563 a group of hosts? not this function, but something needs to. We do
564 not communicate that reason for returning [] outside of here...
565 For now, we'll just be unschedulable if enough hosts within one group
566 enter Repair Failed state.
567 """
568 assert queue_entry.atomic_group_id is not None
569 job = queue_entry.job
570 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000571 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000572 if job.synch_count > atomic_group.max_number_of_machines:
573 # Such a Job and HostQueueEntry should never be possible to
574 # create using the frontend. Regardless, we can't process it.
575 # Abort it immediately and log an error on the scheduler.
576 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000577 logging.error(
578 'Error: job %d synch_count=%d > requested atomic_group %d '
579 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
580 job.id, job.synch_count, atomic_group.id,
581 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000582 return []
583 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
584 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
585 set())
586
587 # Look in each label associated with atomic_group until we find one with
588 # enough hosts to satisfy the job.
589 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
590 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
591 if queue_entry.meta_host is not None:
592 # If we have a metahost label, only allow its hosts.
593 group_hosts.intersection_update(hosts_in_label)
594 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000595 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000596 group_hosts, queue_entry)
597
598 # Job.synch_count is treated as "minimum synch count" when
599 # scheduling for an atomic group of hosts. The atomic group
600 # number of machines is the maximum to pick out of a single
601 # atomic group label for scheduling at one time.
602 min_hosts = job.synch_count
603 max_hosts = atomic_group.max_number_of_machines
604
showard54c1ea92009-05-20 00:32:58 +0000605 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000606 # Not enough eligible hosts in this atomic group label.
607 continue
608
showard54c1ea92009-05-20 00:32:58 +0000609 eligible_hosts_in_group = [self._hosts_available[id]
610 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000611 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000612 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000613
showard89f84db2009-03-12 20:39:13 +0000614 # Limit ourselves to scheduling the atomic group size.
615 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000616 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000617
618 # Remove the selected hosts from our cached internal state
619 # of available hosts in order to return the Host objects.
620 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000621 for host in eligible_hosts_in_group:
622 hosts_in_label.discard(host.id)
623 self._hosts_available.pop(host.id)
624 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000625 return host_list
626
627 return []
628
629
showard170873e2009-01-07 00:22:26 +0000630class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000631 def __init__(self):
632 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000633 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000634 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000635 user_cleanup_time = scheduler_config.config.clean_interval
636 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
637 _db, user_cleanup_time)
638 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000639 self._host_agents = {}
640 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000641 self._tick_count = 0
642 self._last_garbage_stats_time = time.time()
643 self._seconds_between_garbage_stats = 60 * (
644 global_config.global_config.get_config_value(
645 scheduler_config.CONFIG_SECTION,
646 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000647
mbligh36768f02008-02-22 18:28:33 +0000648
showard915958d2009-04-22 21:00:58 +0000649 def initialize(self, recover_hosts=True):
650 self._periodic_cleanup.initialize()
651 self._24hr_upkeep.initialize()
652
jadmanski0afbb632008-06-06 21:10:57 +0000653 # always recover processes
654 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000655
jadmanski0afbb632008-06-06 21:10:57 +0000656 if recover_hosts:
657 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000658
659
jadmanski0afbb632008-06-06 21:10:57 +0000660 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000661 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000662 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000663 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000664 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000665 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000666 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000667 self._schedule_running_host_queue_entries()
668 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000669 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000670 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000671 _drone_manager.execute_actions()
672 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000673 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000674 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000675
showard97aed502008-11-04 02:01:24 +0000676
mblighf3294cc2009-04-08 21:17:38 +0000677 def _run_cleanup(self):
678 self._periodic_cleanup.run_cleanup_maybe()
679 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000680
mbligh36768f02008-02-22 18:28:33 +0000681
showardf13a9e22009-12-18 22:54:09 +0000682 def _garbage_collection(self):
683 threshold_time = time.time() - self._seconds_between_garbage_stats
684 if threshold_time < self._last_garbage_stats_time:
685 # Don't generate these reports very often.
686 return
687
688 self._last_garbage_stats_time = time.time()
689 # Force a full level 0 collection (because we can, it doesn't hurt
690 # at this interval).
691 gc.collect()
692 logging.info('Logging garbage collector stats on tick %d.',
693 self._tick_count)
694 gc_stats._log_garbage_collector_stats()
695
696
showard170873e2009-01-07 00:22:26 +0000697 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
698 for object_id in object_ids:
699 agent_dict.setdefault(object_id, set()).add(agent)
700
701
702 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
703 for object_id in object_ids:
704 assert object_id in agent_dict
705 agent_dict[object_id].remove(agent)
706
707
showardd1195652009-12-08 22:21:02 +0000708 def add_agent_task(self, agent_task):
709 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000710 self._agents.append(agent)
711 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000712 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
713 self._register_agent_for_ids(self._queue_entry_agents,
714 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000715
showard170873e2009-01-07 00:22:26 +0000716
717 def get_agents_for_entry(self, queue_entry):
718 """
719 Find agents corresponding to the specified queue_entry.
720 """
showardd3dc1992009-04-22 21:01:40 +0000721 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000722
723
724 def host_has_agent(self, host):
725 """
726 Determine if there is currently an Agent present using this host.
727 """
728 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000729
730
jadmanski0afbb632008-06-06 21:10:57 +0000731 def remove_agent(self, agent):
732 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000733 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
734 agent)
735 self._unregister_agent_for_ids(self._queue_entry_agents,
736 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000737
738
showard8cc058f2009-09-08 16:26:33 +0000739 def _host_has_scheduled_special_task(self, host):
740 return bool(models.SpecialTask.objects.filter(host__id=host.id,
741 is_active=False,
742 is_complete=False))
743
744
jadmanski0afbb632008-06-06 21:10:57 +0000745 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000746 agent_tasks = self._create_recovery_agent_tasks()
747 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000748 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000749 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000750 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000751 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000752 self._reverify_remaining_hosts()
753 # reinitialize drones after killing orphaned processes, since they can
754 # leave around files when they die
755 _drone_manager.execute_actions()
756 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000757
showard170873e2009-01-07 00:22:26 +0000758
showardd1195652009-12-08 22:21:02 +0000759 def _create_recovery_agent_tasks(self):
760 return (self._get_queue_entry_agent_tasks()
761 + self._get_special_task_agent_tasks(is_active=True))
762
763
764 def _get_queue_entry_agent_tasks(self):
765 # host queue entry statuses handled directly by AgentTasks (Verifying is
766 # handled through SpecialTasks, so is not listed here)
767 statuses = (models.HostQueueEntry.Status.STARTING,
768 models.HostQueueEntry.Status.RUNNING,
769 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000770 models.HostQueueEntry.Status.PARSING,
771 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000772 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000773 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000774 where='status IN (%s)' % status_list)
775
776 agent_tasks = []
777 used_queue_entries = set()
778 for entry in queue_entries:
779 if self.get_agents_for_entry(entry):
780 # already being handled
781 continue
782 if entry in used_queue_entries:
783 # already picked up by a synchronous job
784 continue
785 agent_task = self._get_agent_task_for_queue_entry(entry)
786 agent_tasks.append(agent_task)
787 used_queue_entries.update(agent_task.queue_entries)
788 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000789
790
showardd1195652009-12-08 22:21:02 +0000791 def _get_special_task_agent_tasks(self, is_active=False):
792 special_tasks = models.SpecialTask.objects.filter(
793 is_active=is_active, is_complete=False)
794 return [self._get_agent_task_for_special_task(task)
795 for task in special_tasks]
796
797
798 def _get_agent_task_for_queue_entry(self, queue_entry):
799 """
800 Construct an AgentTask instance for the given active HostQueueEntry,
801 if one can currently run it.
802 @param queue_entry: a HostQueueEntry
803 @returns an AgentTask to run the queue entry
804 """
805 task_entries = queue_entry.job.get_group_entries(queue_entry)
806 self._check_for_duplicate_host_entries(task_entries)
807
808 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
809 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000810 if queue_entry.is_hostless():
811 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000812 return QueueTask(queue_entries=task_entries)
813 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
814 return GatherLogsTask(queue_entries=task_entries)
815 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
816 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000817 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
818 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000819
820 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
821 'invalid status %s: %s' % (entry.status, entry))
822
823
824 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000825 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
826 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000827 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000828 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000829 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000830 if using_host:
showardd1195652009-12-08 22:21:02 +0000831 self._assert_host_has_no_agent(task_entry)
832
833
834 def _assert_host_has_no_agent(self, entry):
835 """
836 @param entry: a HostQueueEntry or a SpecialTask
837 """
838 if self.host_has_agent(entry.host):
839 agent = tuple(self._host_agents.get(entry.host.id))[0]
840 raise SchedulerError(
841 'While scheduling %s, host %s already has a host agent %s'
842 % (entry, entry.host, agent.task))
843
844
845 def _get_agent_task_for_special_task(self, special_task):
846 """
847 Construct an AgentTask class to run the given SpecialTask and add it
848 to this dispatcher.
849 @param special_task: a models.SpecialTask instance
850 @returns an AgentTask to run this SpecialTask
851 """
852 self._assert_host_has_no_agent(special_task)
853
854 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
855 for agent_task_class in special_agent_task_classes:
856 if agent_task_class.TASK_TYPE == special_task.task:
857 return agent_task_class(task=special_task)
858
859 raise SchedulerError('No AgentTask class for task', str(special_task))
860
861
862 def _register_pidfiles(self, agent_tasks):
863 for agent_task in agent_tasks:
864 agent_task.register_necessary_pidfiles()
865
866
867 def _recover_tasks(self, agent_tasks):
868 orphans = _drone_manager.get_orphaned_autoserv_processes()
869
870 for agent_task in agent_tasks:
871 agent_task.recover()
872 if agent_task.monitor and agent_task.monitor.has_process():
873 orphans.discard(agent_task.monitor.get_process())
874 self.add_agent_task(agent_task)
875
876 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000877
878
showard8cc058f2009-09-08 16:26:33 +0000879 def _get_unassigned_entries(self, status):
880 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000881 if entry.status == status and not self.get_agents_for_entry(entry):
882 # The status can change during iteration, e.g., if job.run()
883 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000884 yield entry
885
886
showard6878e8b2009-07-20 22:37:45 +0000887 def _check_for_remaining_orphan_processes(self, orphans):
888 if not orphans:
889 return
890 subject = 'Unrecovered orphan autoserv processes remain'
891 message = '\n'.join(str(process) for process in orphans)
892 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000893
894 die_on_orphans = global_config.global_config.get_config_value(
895 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
896
897 if die_on_orphans:
898 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000899
showard170873e2009-01-07 00:22:26 +0000900
showard8cc058f2009-09-08 16:26:33 +0000901 def _recover_pending_entries(self):
902 for entry in self._get_unassigned_entries(
903 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000904 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000905 entry.on_pending()
906
907
showardb8900452009-10-12 20:31:01 +0000908 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000909 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000910 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
911 unrecovered_hqes = []
912 for queue_entry in queue_entries:
913 special_tasks = models.SpecialTask.objects.filter(
914 task__in=(models.SpecialTask.Task.CLEANUP,
915 models.SpecialTask.Task.VERIFY),
916 queue_entry__id=queue_entry.id,
917 is_complete=False)
918 if special_tasks.count() == 0:
919 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000920
showardb8900452009-10-12 20:31:01 +0000921 if unrecovered_hqes:
922 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000923 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000924 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000925 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000926
927
showard65db3932009-10-28 19:54:35 +0000928 def _get_prioritized_special_tasks(self):
929 """
930 Returns all queued SpecialTasks prioritized for repair first, then
931 cleanup, then verify.
932 """
933 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
934 is_complete=False,
935 host__locked=False)
936 # exclude hosts with active queue entries unless the SpecialTask is for
937 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000938 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000939 queued_tasks, 'afe_host_queue_entries', 'host_id',
940 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000941 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000942 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000943 where=['(afe_host_queue_entries.id IS NULL OR '
944 'afe_host_queue_entries.id = '
945 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000946
showard65db3932009-10-28 19:54:35 +0000947 # reorder tasks by priority
948 task_priority_order = [models.SpecialTask.Task.REPAIR,
949 models.SpecialTask.Task.CLEANUP,
950 models.SpecialTask.Task.VERIFY]
951 def task_priority_key(task):
952 return task_priority_order.index(task.task)
953 return sorted(queued_tasks, key=task_priority_key)
954
955
showard65db3932009-10-28 19:54:35 +0000956 def _schedule_special_tasks(self):
957 """
958 Execute queued SpecialTasks that are ready to run on idle hosts.
959 """
960 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000961 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000962 continue
showardd1195652009-12-08 22:21:02 +0000963 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000964
965
showard170873e2009-01-07 00:22:26 +0000966 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000967 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000968 # should never happen
showarded2afea2009-07-07 20:54:07 +0000969 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000970 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000971 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000972 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000973 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000974
975
jadmanski0afbb632008-06-06 21:10:57 +0000976 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000977 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000978 full_where='locked = 0 AND invalid = 0 AND ' + where
979 for host in Host.fetch(where=full_where):
980 if self.host_has_agent(host):
981 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000982 continue
showard8cc058f2009-09-08 16:26:33 +0000983 if self._host_has_scheduled_special_task(host):
984 # host will have a special task scheduled on the next cycle
985 continue
showard170873e2009-01-07 00:22:26 +0000986 if print_message:
showardb18134f2009-03-20 20:52:18 +0000987 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000988 models.SpecialTask.objects.create(
989 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000990 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000991
992
jadmanski0afbb632008-06-06 21:10:57 +0000993 def _recover_hosts(self):
994 # recover "Repair Failed" hosts
995 message = 'Reverifying dead host %s'
996 self._reverify_hosts_where("status = 'Repair Failed'",
997 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000998
999
showard04c82c52008-05-29 19:38:12 +00001000
showardb95b1bd2008-08-15 18:11:04 +00001001 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001002 # prioritize by job priority, then non-metahost over metahost, then FIFO
1003 return list(HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001004 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001005 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001006 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001007
1008
showard89f84db2009-03-12 20:39:13 +00001009 def _refresh_pending_queue_entries(self):
1010 """
1011 Lookup the pending HostQueueEntries and call our HostScheduler
1012 refresh() method given that list. Return the list.
1013
1014 @returns A list of pending HostQueueEntries sorted in priority order.
1015 """
showard63a34772008-08-18 19:32:50 +00001016 queue_entries = self._get_pending_queue_entries()
1017 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001018 return []
showardb95b1bd2008-08-15 18:11:04 +00001019
showard63a34772008-08-18 19:32:50 +00001020 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001021
showard89f84db2009-03-12 20:39:13 +00001022 return queue_entries
1023
1024
1025 def _schedule_atomic_group(self, queue_entry):
1026 """
1027 Schedule the given queue_entry on an atomic group of hosts.
1028
1029 Returns immediately if there are insufficient available hosts.
1030
1031 Creates new HostQueueEntries based off of queue_entry for the
1032 scheduled hosts and starts them all running.
1033 """
1034 # This is a virtual host queue entry representing an entire
1035 # atomic group, find a group and schedule their hosts.
1036 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1037 queue_entry)
1038 if not group_hosts:
1039 return
showardcbe6f942009-06-17 19:33:49 +00001040
1041 logging.info('Expanding atomic group entry %s with hosts %s',
1042 queue_entry,
1043 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001044 # The first assigned host uses the original HostQueueEntry
1045 group_queue_entries = [queue_entry]
1046 for assigned_host in group_hosts[1:]:
1047 # Create a new HQE for every additional assigned_host.
1048 new_hqe = HostQueueEntry.clone(queue_entry)
1049 new_hqe.save()
1050 group_queue_entries.append(new_hqe)
1051 assert len(group_queue_entries) == len(group_hosts)
1052 for queue_entry, host in itertools.izip(group_queue_entries,
1053 group_hosts):
1054 self._run_queue_entry(queue_entry, host)
1055
1056
showarda9545c02009-12-18 22:44:26 +00001057 def _schedule_hostless_job(self, queue_entry):
1058 self.add_agent_task(HostlessQueueTask(queue_entry))
1059
1060
showard89f84db2009-03-12 20:39:13 +00001061 def _schedule_new_jobs(self):
1062 queue_entries = self._refresh_pending_queue_entries()
1063 if not queue_entries:
1064 return
1065
showard63a34772008-08-18 19:32:50 +00001066 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001067 is_unassigned_atomic_group = (
1068 queue_entry.atomic_group_id is not None
1069 and queue_entry.host_id is None)
1070 if is_unassigned_atomic_group:
1071 self._schedule_atomic_group(queue_entry)
showarda9545c02009-12-18 22:44:26 +00001072 elif queue_entry.is_hostless():
1073 self._schedule_hostless_job(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001074 else:
showard89f84db2009-03-12 20:39:13 +00001075 assigned_host = self._host_scheduler.find_eligible_host(
1076 queue_entry)
showard65db3932009-10-28 19:54:35 +00001077 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001078 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001079
1080
showard8cc058f2009-09-08 16:26:33 +00001081 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001082 for agent_task in self._get_queue_entry_agent_tasks():
1083 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001084
1085
1086 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001087 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1088 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001089 task = entry.job.schedule_delayed_callback_task(entry)
1090 if task:
showardd1195652009-12-08 22:21:02 +00001091 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001092
1093
showardb95b1bd2008-08-15 18:11:04 +00001094 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001095 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001096
1097
jadmanski0afbb632008-06-06 21:10:57 +00001098 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001099 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001100 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001101 for agent in self.get_agents_for_entry(entry):
1102 agent.abort()
1103 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001104
1105
showard324bf812009-01-20 23:23:38 +00001106 def _can_start_agent(self, agent, num_started_this_cycle,
1107 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001108 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001109 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001110 return True
1111 # don't allow any nonzero-process agents to run after we've reached a
1112 # limit (this avoids starvation of many-process agents)
1113 if have_reached_limit:
1114 return False
1115 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001116 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001117 agent.task.owner_username)
1118 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001119 return False
1120 # if a single agent exceeds the per-cycle throttling, still allow it to
1121 # run when it's the first agent in the cycle
1122 if num_started_this_cycle == 0:
1123 return True
1124 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001125 if (num_started_this_cycle + agent.task.num_processes >
1126 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001127 return False
1128 return True
1129
1130
jadmanski0afbb632008-06-06 21:10:57 +00001131 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001132 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001133 have_reached_limit = False
1134 # iterate over copy, so we can remove agents during iteration
1135 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001136 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001137 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001138 have_reached_limit):
1139 have_reached_limit = True
1140 continue
showardd1195652009-12-08 22:21:02 +00001141 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001142 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001143 if agent.is_done():
1144 logging.info("agent finished")
1145 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001146 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001147 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001148
1149
showard29f7cd22009-04-29 21:16:24 +00001150 def _process_recurring_runs(self):
1151 recurring_runs = models.RecurringRun.objects.filter(
1152 start_date__lte=datetime.datetime.now())
1153 for rrun in recurring_runs:
1154 # Create job from template
1155 job = rrun.job
1156 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001157 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001158
1159 host_objects = info['hosts']
1160 one_time_hosts = info['one_time_hosts']
1161 metahost_objects = info['meta_hosts']
1162 dependencies = info['dependencies']
1163 atomic_group = info['atomic_group']
1164
1165 for host in one_time_hosts or []:
1166 this_host = models.Host.create_one_time_host(host.hostname)
1167 host_objects.append(this_host)
1168
1169 try:
1170 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001171 options=options,
showard29f7cd22009-04-29 21:16:24 +00001172 host_objects=host_objects,
1173 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001174 atomic_group=atomic_group)
1175
1176 except Exception, ex:
1177 logging.exception(ex)
1178 #TODO send email
1179
1180 if rrun.loop_count == 1:
1181 rrun.delete()
1182 else:
1183 if rrun.loop_count != 0: # if not infinite loop
1184 # calculate new start_date
1185 difference = datetime.timedelta(seconds=rrun.loop_period)
1186 rrun.start_date = rrun.start_date + difference
1187 rrun.loop_count -= 1
1188 rrun.save()
1189
1190
showard170873e2009-01-07 00:22:26 +00001191class PidfileRunMonitor(object):
1192 """
1193 Client must call either run() to start a new process or
1194 attach_to_existing_process().
1195 """
mbligh36768f02008-02-22 18:28:33 +00001196
showard170873e2009-01-07 00:22:26 +00001197 class _PidfileException(Exception):
1198 """
1199 Raised when there's some unexpected behavior with the pid file, but only
1200 used internally (never allowed to escape this class).
1201 """
mbligh36768f02008-02-22 18:28:33 +00001202
1203
showard170873e2009-01-07 00:22:26 +00001204 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001205 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001206 self._start_time = None
1207 self.pidfile_id = None
1208 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001209
1210
showard170873e2009-01-07 00:22:26 +00001211 def _add_nice_command(self, command, nice_level):
1212 if not nice_level:
1213 return command
1214 return ['nice', '-n', str(nice_level)] + command
1215
1216
1217 def _set_start_time(self):
1218 self._start_time = time.time()
1219
1220
showard418785b2009-11-23 20:19:59 +00001221 def run(self, command, working_directory, num_processes, nice_level=None,
1222 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1223 username=None):
showard170873e2009-01-07 00:22:26 +00001224 assert command is not None
1225 if nice_level is not None:
1226 command = ['nice', '-n', str(nice_level)] + command
1227 self._set_start_time()
1228 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001229 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001230 num_processes=num_processes, log_file=log_file,
1231 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001232
1233
showarded2afea2009-07-07 20:54:07 +00001234 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001235 pidfile_name=_AUTOSERV_PID_FILE,
1236 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001237 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001238 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001239 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001240 if num_processes is not None:
1241 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001242
1243
jadmanski0afbb632008-06-06 21:10:57 +00001244 def kill(self):
showard170873e2009-01-07 00:22:26 +00001245 if self.has_process():
1246 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001247
mbligh36768f02008-02-22 18:28:33 +00001248
showard170873e2009-01-07 00:22:26 +00001249 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001250 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001251 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001252
1253
showard170873e2009-01-07 00:22:26 +00001254 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001255 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001256 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001257 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001258
1259
showard170873e2009-01-07 00:22:26 +00001260 def _read_pidfile(self, use_second_read=False):
1261 assert self.pidfile_id is not None, (
1262 'You must call run() or attach_to_existing_process()')
1263 contents = _drone_manager.get_pidfile_contents(
1264 self.pidfile_id, use_second_read=use_second_read)
1265 if contents.is_invalid():
1266 self._state = drone_manager.PidfileContents()
1267 raise self._PidfileException(contents)
1268 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001269
1270
showard21baa452008-10-21 00:08:39 +00001271 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001272 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1273 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001274 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001275 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001276
1277
1278 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001279 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001280 return
mblighbb421852008-03-11 22:36:16 +00001281
showard21baa452008-10-21 00:08:39 +00001282 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001283
showard170873e2009-01-07 00:22:26 +00001284 if self._state.process is None:
1285 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001286 return
mbligh90a549d2008-03-25 23:52:34 +00001287
showard21baa452008-10-21 00:08:39 +00001288 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001289 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001290 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001291 return
mbligh90a549d2008-03-25 23:52:34 +00001292
showard170873e2009-01-07 00:22:26 +00001293 # pid but no running process - maybe process *just* exited
1294 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001295 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001296 # autoserv exited without writing an exit code
1297 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001298 self._handle_pidfile_error(
1299 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001300
showard21baa452008-10-21 00:08:39 +00001301
1302 def _get_pidfile_info(self):
1303 """\
1304 After completion, self._state will contain:
1305 pid=None, exit_status=None if autoserv has not yet run
1306 pid!=None, exit_status=None if autoserv is running
1307 pid!=None, exit_status!=None if autoserv has completed
1308 """
1309 try:
1310 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001311 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001312 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001313
1314
showard170873e2009-01-07 00:22:26 +00001315 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001316 """\
1317 Called when no pidfile is found or no pid is in the pidfile.
1318 """
showard170873e2009-01-07 00:22:26 +00001319 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001320 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001321 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001322 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001323 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001324
1325
showard35162b02009-03-03 02:17:30 +00001326 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001327 """\
1328 Called when autoserv has exited without writing an exit status,
1329 or we've timed out waiting for autoserv to write a pid to the
1330 pidfile. In either case, we just return failure and the caller
1331 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001332
showard170873e2009-01-07 00:22:26 +00001333 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001334 """
1335 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001336 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001337 self._state.exit_status = 1
1338 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001339
1340
jadmanski0afbb632008-06-06 21:10:57 +00001341 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001342 self._get_pidfile_info()
1343 return self._state.exit_status
1344
1345
1346 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001347 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001348 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001349 if self._state.num_tests_failed is None:
1350 return -1
showard21baa452008-10-21 00:08:39 +00001351 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001352
1353
showardcdaeae82009-08-31 18:32:48 +00001354 def try_copy_results_on_drone(self, **kwargs):
1355 if self.has_process():
1356 # copy results logs into the normal place for job results
1357 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1358
1359
1360 def try_copy_to_results_repository(self, source, **kwargs):
1361 if self.has_process():
1362 _drone_manager.copy_to_results_repository(self.get_process(),
1363 source, **kwargs)
1364
1365
mbligh36768f02008-02-22 18:28:33 +00001366class Agent(object):
showard77182562009-06-10 00:16:05 +00001367 """
showard8cc058f2009-09-08 16:26:33 +00001368 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001369
1370 The following methods are required on all task objects:
1371 poll() - Called periodically to let the task check its status and
1372 update its internal state. If the task succeeded.
1373 is_done() - Returns True if the task is finished.
1374 abort() - Called when an abort has been requested. The task must
1375 set its aborted attribute to True if it actually aborted.
1376
1377 The following attributes are required on all task objects:
1378 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001379 success - bool, True if this task succeeded.
1380 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1381 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001382 """
1383
1384
showard418785b2009-11-23 20:19:59 +00001385 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001386 """
showard8cc058f2009-09-08 16:26:33 +00001387 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001388 """
showard8cc058f2009-09-08 16:26:33 +00001389 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001390
showard77182562009-06-10 00:16:05 +00001391 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001392 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001393
showard8cc058f2009-09-08 16:26:33 +00001394 self.queue_entry_ids = task.queue_entry_ids
1395 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001396
showard8cc058f2009-09-08 16:26:33 +00001397 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001398 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001399
1400
jadmanski0afbb632008-06-06 21:10:57 +00001401 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001402 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001403 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001404 self.task.poll()
1405 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001406 self.finished = True
showardec113162008-05-08 00:52:49 +00001407
1408
jadmanski0afbb632008-06-06 21:10:57 +00001409 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001410 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001411
1412
showardd3dc1992009-04-22 21:01:40 +00001413 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001414 if self.task:
1415 self.task.abort()
1416 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001417 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001418 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001419
showardd3dc1992009-04-22 21:01:40 +00001420
showard77182562009-06-10 00:16:05 +00001421class DelayedCallTask(object):
1422 """
1423 A task object like AgentTask for an Agent to run that waits for the
1424 specified amount of time to have elapsed before calling the supplied
1425 callback once and finishing. If the callback returns anything, it is
1426 assumed to be a new Agent instance and will be added to the dispatcher.
1427
1428 @attribute end_time: The absolute posix time after which this task will
1429 call its callback when it is polled and be finished.
1430
1431 Also has all attributes required by the Agent class.
1432 """
1433 def __init__(self, delay_seconds, callback, now_func=None):
1434 """
1435 @param delay_seconds: The delay in seconds from now that this task
1436 will call the supplied callback and be done.
1437 @param callback: A callable to be called by this task once after at
1438 least delay_seconds time has elapsed. It must return None
1439 or a new Agent instance.
1440 @param now_func: A time.time like function. Default: time.time.
1441 Used for testing.
1442 """
1443 assert delay_seconds > 0
1444 assert callable(callback)
1445 if not now_func:
1446 now_func = time.time
1447 self._now_func = now_func
1448 self._callback = callback
1449
1450 self.end_time = self._now_func() + delay_seconds
1451
1452 # These attributes are required by Agent.
1453 self.aborted = False
showard77182562009-06-10 00:16:05 +00001454 self.host_ids = ()
1455 self.success = False
1456 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001457 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001458
1459
1460 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001461 if not self.is_done() and self._now_func() >= self.end_time:
1462 self._callback()
showard77182562009-06-10 00:16:05 +00001463 self.success = True
1464
1465
1466 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001467 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001468
1469
1470 def abort(self):
1471 self.aborted = True
showard77182562009-06-10 00:16:05 +00001472
1473
mbligh36768f02008-02-22 18:28:33 +00001474class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001475 class _NullMonitor(object):
1476 pidfile_id = None
1477
1478 def has_process(self):
1479 return True
1480
1481
1482 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001483 """
showardd1195652009-12-08 22:21:02 +00001484 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001485 """
jadmanski0afbb632008-06-06 21:10:57 +00001486 self.done = False
showardd1195652009-12-08 22:21:02 +00001487 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001489 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001490 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001491 self.queue_entry_ids = []
1492 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001493 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001494
1495
1496 def _set_ids(self, host=None, queue_entries=None):
1497 if queue_entries and queue_entries != [None]:
1498 self.host_ids = [entry.host.id for entry in queue_entries]
1499 self.queue_entry_ids = [entry.id for entry in queue_entries]
1500 else:
1501 assert host
1502 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001503
1504
jadmanski0afbb632008-06-06 21:10:57 +00001505 def poll(self):
showard08a36412009-05-05 01:01:13 +00001506 if not self.started:
1507 self.start()
showardd1195652009-12-08 22:21:02 +00001508 if not self.done:
1509 self.tick()
showard08a36412009-05-05 01:01:13 +00001510
1511
1512 def tick(self):
showardd1195652009-12-08 22:21:02 +00001513 assert self.monitor
1514 exit_code = self.monitor.exit_code()
1515 if exit_code is None:
1516 return
mbligh36768f02008-02-22 18:28:33 +00001517
showardd1195652009-12-08 22:21:02 +00001518 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001519 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001520
1521
jadmanski0afbb632008-06-06 21:10:57 +00001522 def is_done(self):
1523 return self.done
mbligh36768f02008-02-22 18:28:33 +00001524
1525
jadmanski0afbb632008-06-06 21:10:57 +00001526 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001527 if self.done:
showardd1195652009-12-08 22:21:02 +00001528 assert self.started
showard08a36412009-05-05 01:01:13 +00001529 return
showardd1195652009-12-08 22:21:02 +00001530 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001531 self.done = True
1532 self.success = success
1533 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001534
1535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001537 """
1538 To be overridden.
1539 """
showarded2afea2009-07-07 20:54:07 +00001540 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001541 self.register_necessary_pidfiles()
1542
1543
1544 def _log_file(self):
1545 if not self._log_file_name:
1546 return None
1547 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001548
mbligh36768f02008-02-22 18:28:33 +00001549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001551 log_file = self._log_file()
1552 if self.monitor and log_file:
1553 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001554
1555
jadmanski0afbb632008-06-06 21:10:57 +00001556 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001557 """
1558 To be overridden.
1559 """
jadmanski0afbb632008-06-06 21:10:57 +00001560 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001561 logging.info("%s finished with success=%s", type(self).__name__,
1562 self.success)
1563
mbligh36768f02008-02-22 18:28:33 +00001564
1565
jadmanski0afbb632008-06-06 21:10:57 +00001566 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001567 if not self.started:
1568 self.prolog()
1569 self.run()
1570
1571 self.started = True
1572
1573
1574 def abort(self):
1575 if self.monitor:
1576 self.monitor.kill()
1577 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001578 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001579 self.cleanup()
1580
1581
showarded2afea2009-07-07 20:54:07 +00001582 def _get_consistent_execution_path(self, execution_entries):
1583 first_execution_path = execution_entries[0].execution_path()
1584 for execution_entry in execution_entries[1:]:
1585 assert execution_entry.execution_path() == first_execution_path, (
1586 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1587 execution_entry,
1588 first_execution_path,
1589 execution_entries[0]))
1590 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001591
1592
showarded2afea2009-07-07 20:54:07 +00001593 def _copy_results(self, execution_entries, use_monitor=None):
1594 """
1595 @param execution_entries: list of objects with execution_path() method
1596 """
showard6d1c1432009-08-20 23:30:39 +00001597 if use_monitor is not None and not use_monitor.has_process():
1598 return
1599
showarded2afea2009-07-07 20:54:07 +00001600 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001601 if use_monitor is None:
1602 assert self.monitor
1603 use_monitor = self.monitor
1604 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001605 execution_path = self._get_consistent_execution_path(execution_entries)
1606 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001607 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001608
showarda1e74b32009-05-12 17:32:04 +00001609
1610 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001611 for queue_entry in queue_entries:
1612 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001613
1614
mbligh4608b002010-01-05 18:22:35 +00001615 def _archive_results(self, queue_entries):
1616 for queue_entry in queue_entries:
1617 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001618
1619
showardd1195652009-12-08 22:21:02 +00001620 def _command_line(self):
1621 """
1622 Return the command line to run. Must be overridden.
1623 """
1624 raise NotImplementedError
1625
1626
1627 @property
1628 def num_processes(self):
1629 """
1630 Return the number of processes forked by this AgentTask's process. It
1631 may only be approximate. To be overridden if necessary.
1632 """
1633 return 1
1634
1635
1636 def _paired_with_monitor(self):
1637 """
1638 If this AgentTask's process must run on the same machine as some
1639 previous process, this method should be overridden to return a
1640 PidfileRunMonitor for that process.
1641 """
1642 return self._NullMonitor()
1643
1644
1645 @property
1646 def owner_username(self):
1647 """
1648 Return login of user responsible for this task. May be None. Must be
1649 overridden.
1650 """
1651 raise NotImplementedError
1652
1653
1654 def _working_directory(self):
1655 """
1656 Return the directory where this AgentTask's process executes. Must be
1657 overridden.
1658 """
1659 raise NotImplementedError
1660
1661
1662 def _pidfile_name(self):
1663 """
1664 Return the name of the pidfile this AgentTask's process uses. To be
1665 overridden if necessary.
1666 """
1667 return _AUTOSERV_PID_FILE
1668
1669
1670 def _check_paired_results_exist(self):
1671 if not self._paired_with_monitor().has_process():
1672 email_manager.manager.enqueue_notify_email(
1673 'No paired results in task',
1674 'No paired results in task %s at %s'
1675 % (self, self._paired_with_monitor().pidfile_id))
1676 self.finished(False)
1677 return False
1678 return True
1679
1680
1681 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001682 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001683 self.monitor = PidfileRunMonitor()
1684
1685
1686 def run(self):
1687 if not self._check_paired_results_exist():
1688 return
1689
1690 self._create_monitor()
1691 self.monitor.run(
1692 self._command_line(), self._working_directory(),
1693 num_processes=self.num_processes,
1694 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1695 pidfile_name=self._pidfile_name(),
1696 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1697 username=self.owner_username)
1698
1699
1700 def register_necessary_pidfiles(self):
1701 pidfile_id = _drone_manager.get_pidfile_id_from(
1702 self._working_directory(), self._pidfile_name())
1703 _drone_manager.register_pidfile(pidfile_id)
1704
1705 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1706 if paired_pidfile_id:
1707 _drone_manager.register_pidfile(paired_pidfile_id)
1708
1709
1710 def recover(self):
1711 if not self._check_paired_results_exist():
1712 return
1713
1714 self._create_monitor()
1715 self.monitor.attach_to_existing_process(
1716 self._working_directory(), pidfile_name=self._pidfile_name(),
1717 num_processes=self.num_processes)
1718 if not self.monitor.has_process():
1719 # no process to recover; wait to be started normally
1720 self.monitor = None
1721 return
1722
1723 self.started = True
1724 logging.info('Recovering process %s for %s at %s'
1725 % (self.monitor.get_process(), type(self).__name__,
1726 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001727
1728
mbligh4608b002010-01-05 18:22:35 +00001729 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1730 allowed_host_statuses=None):
1731 for entry in queue_entries:
1732 if entry.status not in allowed_hqe_statuses:
1733 raise SchedulerError('Queue task attempting to start '
1734 'entry with invalid status %s: %s'
1735 % (entry.status, entry))
1736 invalid_host_status = (
1737 allowed_host_statuses is not None
1738 and entry.host.status not in allowed_host_statuses)
1739 if invalid_host_status:
1740 raise SchedulerError('Queue task attempting to start on queue '
1741 'entry with invalid host status %s: %s'
1742 % (entry.host.status, entry))
1743
1744
showardd9205182009-04-27 20:09:55 +00001745class TaskWithJobKeyvals(object):
1746 """AgentTask mixin providing functionality to help with job keyval files."""
1747 _KEYVAL_FILE = 'keyval'
1748 def _format_keyval(self, key, value):
1749 return '%s=%s' % (key, value)
1750
1751
1752 def _keyval_path(self):
1753 """Subclasses must override this"""
1754 raise NotImplemented
1755
1756
1757 def _write_keyval_after_job(self, field, value):
1758 assert self.monitor
1759 if not self.monitor.has_process():
1760 return
1761 _drone_manager.write_lines_to_file(
1762 self._keyval_path(), [self._format_keyval(field, value)],
1763 paired_with_process=self.monitor.get_process())
1764
1765
1766 def _job_queued_keyval(self, job):
1767 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1768
1769
1770 def _write_job_finished(self):
1771 self._write_keyval_after_job("job_finished", int(time.time()))
1772
1773
showarddb502762009-09-09 15:31:20 +00001774 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1775 keyval_contents = '\n'.join(self._format_keyval(key, value)
1776 for key, value in keyval_dict.iteritems())
1777 # always end with a newline to allow additional keyvals to be written
1778 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001779 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001780 keyval_contents,
1781 file_path=keyval_path)
1782
1783
1784 def _write_keyvals_before_job(self, keyval_dict):
1785 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1786
1787
1788 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001789 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001790 host.hostname)
1791 platform, all_labels = host.platform_and_labels()
1792 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1793 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1794
1795
showard8cc058f2009-09-08 16:26:33 +00001796class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001797 """
1798 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1799 """
1800
1801 TASK_TYPE = None
1802 host = None
1803 queue_entry = None
1804
showardd1195652009-12-08 22:21:02 +00001805 def __init__(self, task, extra_command_args):
1806 super(SpecialAgentTask, self).__init__()
1807
showarded2afea2009-07-07 20:54:07 +00001808 assert (self.TASK_TYPE is not None,
1809 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001810
1811 self.host = Host(id=task.host.id)
1812 self.queue_entry = None
1813 if task.queue_entry:
1814 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1815
showarded2afea2009-07-07 20:54:07 +00001816 self.task = task
1817 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001818
1819
showard8cc058f2009-09-08 16:26:33 +00001820 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001821 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1822
1823
1824 def _command_line(self):
1825 return _autoserv_command_line(self.host.hostname,
1826 self._extra_command_args,
1827 queue_entry=self.queue_entry)
1828
1829
1830 def _working_directory(self):
1831 return self.task.execution_path()
1832
1833
1834 @property
1835 def owner_username(self):
1836 if self.task.requested_by:
1837 return self.task.requested_by.login
1838 return None
showard8cc058f2009-09-08 16:26:33 +00001839
1840
showarded2afea2009-07-07 20:54:07 +00001841 def prolog(self):
1842 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001843 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001844 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001845
1846
showardde634ee2009-01-30 01:44:24 +00001847 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001848 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001849
showard2fe3f1d2009-07-06 20:19:11 +00001850 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001851 return # don't fail metahost entries, they'll be reassigned
1852
showard2fe3f1d2009-07-06 20:19:11 +00001853 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001854 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001855 return # entry has been aborted
1856
showard2fe3f1d2009-07-06 20:19:11 +00001857 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001858 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001859 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001860 self._write_keyval_after_job(queued_key, queued_time)
1861 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001862
showard8cc058f2009-09-08 16:26:33 +00001863 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001864 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001865 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001866 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001867
showard8cc058f2009-09-08 16:26:33 +00001868 pidfile_id = _drone_manager.get_pidfile_id_from(
1869 self.queue_entry.execution_path(),
1870 pidfile_name=_AUTOSERV_PID_FILE)
1871 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001872
1873 if self.queue_entry.job.parse_failed_repair:
1874 self._parse_results([self.queue_entry])
1875 else:
1876 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001877
1878
1879 def cleanup(self):
1880 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001881
1882 # We will consider an aborted task to be "Failed"
1883 self.task.finish(bool(self.success))
1884
showardf85a0b72009-10-07 20:48:45 +00001885 if self.monitor:
1886 if self.monitor.has_process():
1887 self._copy_results([self.task])
1888 if self.monitor.pidfile_id is not None:
1889 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001890
1891
1892class RepairTask(SpecialAgentTask):
1893 TASK_TYPE = models.SpecialTask.Task.REPAIR
1894
1895
showardd1195652009-12-08 22:21:02 +00001896 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001897 """\
1898 queue_entry: queue entry to mark failed if this repair fails.
1899 """
1900 protection = host_protections.Protection.get_string(
1901 task.host.protection)
1902 # normalize the protection name
1903 protection = host_protections.Protection.get_attr_name(protection)
1904
1905 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001906 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001907
1908 # *don't* include the queue entry in IDs -- if the queue entry is
1909 # aborted, we want to leave the repair task running
1910 self._set_ids(host=self.host)
1911
1912
1913 def prolog(self):
1914 super(RepairTask, self).prolog()
1915 logging.info("repair_task starting")
1916 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001917
1918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def epilog(self):
1920 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001921
jadmanski0afbb632008-06-06 21:10:57 +00001922 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001923 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001924 else:
showard8cc058f2009-09-08 16:26:33 +00001925 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001926 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001927 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001928
1929
showarded2afea2009-07-07 20:54:07 +00001930class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001931 def _copy_to_results_repository(self):
1932 if not self.queue_entry or self.queue_entry.meta_host:
1933 return
1934
1935 self.queue_entry.set_execution_subdir()
1936 log_name = os.path.basename(self.task.execution_path())
1937 source = os.path.join(self.task.execution_path(), 'debug',
1938 'autoserv.DEBUG')
1939 destination = os.path.join(
1940 self.queue_entry.execution_path(), log_name)
1941
1942 self.monitor.try_copy_to_results_repository(
1943 source, destination_path=destination)
1944
1945
showard170873e2009-01-07 00:22:26 +00001946 def epilog(self):
1947 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001948
showard775300b2009-09-09 15:30:50 +00001949 if self.success:
1950 return
showard8fe93b52008-11-18 17:53:22 +00001951
showard775300b2009-09-09 15:30:50 +00001952 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001953
showard775300b2009-09-09 15:30:50 +00001954 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001955 # effectively ignore failure for these hosts
1956 self.success = True
showard775300b2009-09-09 15:30:50 +00001957 return
1958
1959 if self.queue_entry:
1960 self.queue_entry.requeue()
1961
1962 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001963 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001964 queue_entry__id=self.queue_entry.id):
1965 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1966 self._fail_queue_entry()
1967 return
1968
showard9bb960b2009-11-19 01:02:11 +00001969 queue_entry = models.HostQueueEntry.objects.get(
1970 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001971 else:
1972 queue_entry = None
1973
1974 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001975 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001976 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001977 queue_entry=queue_entry,
1978 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001979
showard8fe93b52008-11-18 17:53:22 +00001980
1981class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001982 TASK_TYPE = models.SpecialTask.Task.VERIFY
1983
1984
showardd1195652009-12-08 22:21:02 +00001985 def __init__(self, task):
1986 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001987 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001988
1989
jadmanski0afbb632008-06-06 21:10:57 +00001990 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001991 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001992
showardb18134f2009-03-20 20:52:18 +00001993 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001994 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001995 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1996 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001997
showarded2afea2009-07-07 20:54:07 +00001998 # Delete any other queued verifies for this host. One verify will do
1999 # and there's no need to keep records of other requests.
2000 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002001 host__id=self.host.id,
2002 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00002003 is_active=False, is_complete=False)
2004 queued_verifies = queued_verifies.exclude(id=self.task.id)
2005 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002006
mbligh36768f02008-02-22 18:28:33 +00002007
jadmanski0afbb632008-06-06 21:10:57 +00002008 def epilog(self):
2009 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002010 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002011 if self.queue_entry:
2012 self.queue_entry.on_pending()
2013 else:
2014 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002015
2016
mbligh4608b002010-01-05 18:22:35 +00002017class CleanupTask(PreJobTask):
2018 # note this can also run post-job, but when it does, it's running standalone
2019 # against the host (not related to the job), so it's not considered a
2020 # PostJobTask
2021
2022 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2023
2024
2025 def __init__(self, task, recover_run_monitor=None):
2026 super(CleanupTask, self).__init__(task, ['--cleanup'])
2027 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2028
2029
2030 def prolog(self):
2031 super(CleanupTask, self).prolog()
2032 logging.info("starting cleanup task for host: %s", self.host.hostname)
2033 self.host.set_status(models.Host.Status.CLEANING)
2034 if self.queue_entry:
2035 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2036
2037
2038 def _finish_epilog(self):
2039 if not self.queue_entry or not self.success:
2040 return
2041
2042 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2043 should_run_verify = (
2044 self.queue_entry.job.run_verify
2045 and self.host.protection != do_not_verify_protection)
2046 if should_run_verify:
2047 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2048 models.SpecialTask.objects.create(
2049 host=models.Host.objects.get(id=self.host.id),
2050 queue_entry=entry,
2051 task=models.SpecialTask.Task.VERIFY)
2052 else:
2053 self.queue_entry.on_pending()
2054
2055
2056 def epilog(self):
2057 super(CleanupTask, self).epilog()
2058
2059 if self.success:
2060 self.host.update_field('dirty', 0)
2061 self.host.set_status(models.Host.Status.READY)
2062
2063 self._finish_epilog()
2064
2065
showarda9545c02009-12-18 22:44:26 +00002066class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2067 """
2068 Common functionality for QueueTask and HostlessQueueTask
2069 """
2070 def __init__(self, queue_entries):
2071 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002072 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002073 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002074
2075
showard73ec0442009-02-07 02:05:20 +00002076 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002077 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002078
2079
showardd1195652009-12-08 22:21:02 +00002080 def _command_line(self):
2081 return self.job.get_autoserv_params(self.queue_entries)
2082
2083
2084 @property
2085 def num_processes(self):
2086 return len(self.queue_entries)
2087
2088
2089 @property
2090 def owner_username(self):
2091 return self.job.owner
2092
2093
2094 def _working_directory(self):
2095 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002096
2097
jadmanski0afbb632008-06-06 21:10:57 +00002098 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002099 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002100 keyval_dict = self.job.keyval_dict()
2101 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002102 group_name = self.queue_entries[0].get_group_name()
2103 if group_name:
2104 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002105 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002106 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002107 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002108 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002109
2110
showard35162b02009-03-03 02:17:30 +00002111 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002112 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002113 _drone_manager.write_lines_to_file(error_file_path,
2114 [_LOST_PROCESS_ERROR])
2115
2116
showardd3dc1992009-04-22 21:01:40 +00002117 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002118 if not self.monitor:
2119 return
2120
showardd9205182009-04-27 20:09:55 +00002121 self._write_job_finished()
2122
showard35162b02009-03-03 02:17:30 +00002123 if self.monitor.lost_process:
2124 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002125
jadmanskif7fa2cc2008-10-01 14:13:23 +00002126
showardcbd74612008-11-19 21:42:02 +00002127 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002128 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002129 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002130 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002131 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002132
2133
jadmanskif7fa2cc2008-10-01 14:13:23 +00002134 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002135 if not self.monitor or not self.monitor.has_process():
2136 return
2137
jadmanskif7fa2cc2008-10-01 14:13:23 +00002138 # build up sets of all the aborted_by and aborted_on values
2139 aborted_by, aborted_on = set(), set()
2140 for queue_entry in self.queue_entries:
2141 if queue_entry.aborted_by:
2142 aborted_by.add(queue_entry.aborted_by)
2143 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2144 aborted_on.add(t)
2145
2146 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002147 # TODO(showard): this conditional is now obsolete, we just need to leave
2148 # it in temporarily for backwards compatibility over upgrades. delete
2149 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002150 assert len(aborted_by) <= 1
2151 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002152 aborted_by_value = aborted_by.pop()
2153 aborted_on_value = max(aborted_on)
2154 else:
2155 aborted_by_value = 'autotest_system'
2156 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002157
showarda0382352009-02-11 23:36:43 +00002158 self._write_keyval_after_job("aborted_by", aborted_by_value)
2159 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002160
showardcbd74612008-11-19 21:42:02 +00002161 aborted_on_string = str(datetime.datetime.fromtimestamp(
2162 aborted_on_value))
2163 self._write_status_comment('Job aborted by %s on %s' %
2164 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002165
2166
jadmanski0afbb632008-06-06 21:10:57 +00002167 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002168 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002169 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002170 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002171
2172
jadmanski0afbb632008-06-06 21:10:57 +00002173 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002174 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002175 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002176
2177
2178class QueueTask(AbstractQueueTask):
2179 def __init__(self, queue_entries):
2180 super(QueueTask, self).__init__(queue_entries)
2181 self._set_ids(queue_entries=queue_entries)
2182
2183
2184 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002185 self._check_queue_entry_statuses(
2186 self.queue_entries,
2187 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2188 models.HostQueueEntry.Status.RUNNING),
2189 allowed_host_statuses=(models.Host.Status.PENDING,
2190 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002191
2192 super(QueueTask, self).prolog()
2193
2194 for queue_entry in self.queue_entries:
2195 self._write_host_keyvals(queue_entry.host)
2196 queue_entry.host.set_status(models.Host.Status.RUNNING)
2197 queue_entry.host.update_field('dirty', 1)
2198 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2199 # TODO(gps): Remove this if nothing needs it anymore.
2200 # A potential user is: tko/parser
2201 self.job.write_to_machines_file(self.queue_entries[0])
2202
2203
2204 def _finish_task(self):
2205 super(QueueTask, self)._finish_task()
2206
2207 for queue_entry in self.queue_entries:
2208 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002209
2210
mbligh4608b002010-01-05 18:22:35 +00002211class HostlessQueueTask(AbstractQueueTask):
2212 def __init__(self, queue_entry):
2213 super(HostlessQueueTask, self).__init__([queue_entry])
2214 self.queue_entry_ids = [queue_entry.id]
2215
2216
2217 def prolog(self):
2218 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2219 super(HostlessQueueTask, self).prolog()
2220
2221
mbligh4608b002010-01-05 18:22:35 +00002222 def _finish_task(self):
2223 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002224 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002225
2226
showardd3dc1992009-04-22 21:01:40 +00002227class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002228 def __init__(self, queue_entries, log_file_name):
2229 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002230
showardd1195652009-12-08 22:21:02 +00002231 self.queue_entries = queue_entries
2232
showardd3dc1992009-04-22 21:01:40 +00002233 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002234 self._autoserv_monitor.attach_to_existing_process(
2235 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002236
showardd1195652009-12-08 22:21:02 +00002237
2238 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002239 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002240 return 'true'
2241 return self._generate_command(
2242 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002243
2244
2245 def _generate_command(self, results_dir):
2246 raise NotImplementedError('Subclasses must override this')
2247
2248
showardd1195652009-12-08 22:21:02 +00002249 @property
2250 def owner_username(self):
2251 return self.queue_entries[0].job.owner
2252
2253
2254 def _working_directory(self):
2255 return self._get_consistent_execution_path(self.queue_entries)
2256
2257
2258 def _paired_with_monitor(self):
2259 return self._autoserv_monitor
2260
2261
showardd3dc1992009-04-22 21:01:40 +00002262 def _job_was_aborted(self):
2263 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002264 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002265 queue_entry.update_from_database()
2266 if was_aborted is None: # first queue entry
2267 was_aborted = bool(queue_entry.aborted)
2268 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2269 email_manager.manager.enqueue_notify_email(
2270 'Inconsistent abort state',
2271 'Queue entries have inconsistent abort state: ' +
2272 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2273 # don't crash here, just assume true
2274 return True
2275 return was_aborted
2276
2277
showardd1195652009-12-08 22:21:02 +00002278 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002279 if self._job_was_aborted():
2280 return models.HostQueueEntry.Status.ABORTED
2281
2282 # we'll use a PidfileRunMonitor to read the autoserv exit status
2283 if self._autoserv_monitor.exit_code() == 0:
2284 return models.HostQueueEntry.Status.COMPLETED
2285 return models.HostQueueEntry.Status.FAILED
2286
2287
showardd3dc1992009-04-22 21:01:40 +00002288 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002289 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002290 queue_entry.set_status(status)
2291
2292
2293 def abort(self):
2294 # override AgentTask.abort() to avoid killing the process and ending
2295 # the task. post-job tasks continue when the job is aborted.
2296 pass
2297
2298
mbligh4608b002010-01-05 18:22:35 +00002299 def _pidfile_label(self):
2300 # '.autoserv_execute' -> 'autoserv'
2301 return self._pidfile_name()[1:-len('_execute')]
2302
2303
showard9bb960b2009-11-19 01:02:11 +00002304class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002305 """
2306 Task responsible for
2307 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2308 * copying logs to the results repository
2309 * spawning CleanupTasks for hosts, if necessary
2310 * spawning a FinalReparseTask for the job
2311 """
showardd1195652009-12-08 22:21:02 +00002312 def __init__(self, queue_entries, recover_run_monitor=None):
2313 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002314 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002315 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002316 self._set_ids(queue_entries=queue_entries)
2317
2318
2319 def _generate_command(self, results_dir):
2320 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002321 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002322 return [_autoserv_path , '-p',
2323 '--pidfile-label=%s' % self._pidfile_label(),
2324 '--use-existing-results', '--collect-crashinfo',
2325 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002326
2327
showardd1195652009-12-08 22:21:02 +00002328 @property
2329 def num_processes(self):
2330 return len(self.queue_entries)
2331
2332
2333 def _pidfile_name(self):
2334 return _CRASHINFO_PID_FILE
2335
2336
showardd3dc1992009-04-22 21:01:40 +00002337 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002338 self._check_queue_entry_statuses(
2339 self.queue_entries,
2340 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2341 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002342
showardd3dc1992009-04-22 21:01:40 +00002343 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002344
2345
showardd3dc1992009-04-22 21:01:40 +00002346 def epilog(self):
2347 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002348 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002349 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002350
showard9bb960b2009-11-19 01:02:11 +00002351
2352 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002353 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002354 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002355 models.HostQueueEntry.Status.COMPLETED)
2356 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2357 else:
2358 final_success = False
2359 num_tests_failed = 0
2360
showard9bb960b2009-11-19 01:02:11 +00002361 reboot_after = self._job.reboot_after
2362 do_reboot = (
2363 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002364 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002365 or reboot_after == models.RebootAfter.ALWAYS
2366 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2367 and final_success and num_tests_failed == 0))
2368
showardd1195652009-12-08 22:21:02 +00002369 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002370 if do_reboot:
2371 # don't pass the queue entry to the CleanupTask. if the cleanup
2372 # fails, the job doesn't care -- it's over.
2373 models.SpecialTask.objects.create(
2374 host=models.Host.objects.get(id=queue_entry.host.id),
2375 task=models.SpecialTask.Task.CLEANUP,
2376 requested_by=self._job.owner_model())
2377 else:
2378 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002379
2380
showard0bbfc212009-04-29 21:06:13 +00002381 def run(self):
showard597bfd32009-05-08 18:22:50 +00002382 autoserv_exit_code = self._autoserv_monitor.exit_code()
2383 # only run if Autoserv exited due to some signal. if we have no exit
2384 # code, assume something bad (and signal-like) happened.
2385 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002386 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002387 else:
2388 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002389
2390
mbligh4608b002010-01-05 18:22:35 +00002391class SelfThrottledPostJobTask(PostJobTask):
2392 """
2393 Special AgentTask subclass that maintains its own global process limit.
2394 """
2395 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002396
2397
mbligh4608b002010-01-05 18:22:35 +00002398 @classmethod
2399 def _increment_running_processes(cls):
2400 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002401
mblighd5c95802008-03-05 00:33:46 +00002402
mbligh4608b002010-01-05 18:22:35 +00002403 @classmethod
2404 def _decrement_running_processes(cls):
2405 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002406
2407
mbligh4608b002010-01-05 18:22:35 +00002408 @classmethod
2409 def _max_processes(cls):
2410 raise NotImplementedError
2411
2412
2413 @classmethod
2414 def _can_run_new_process(cls):
2415 return cls._num_running_processes < cls._max_processes()
2416
2417
2418 def _process_started(self):
2419 return bool(self.monitor)
2420
2421
2422 def tick(self):
2423 # override tick to keep trying to start until the process count goes
2424 # down and we can, at which point we revert to default behavior
2425 if self._process_started():
2426 super(SelfThrottledPostJobTask, self).tick()
2427 else:
2428 self._try_starting_process()
2429
2430
2431 def run(self):
2432 # override run() to not actually run unless we can
2433 self._try_starting_process()
2434
2435
2436 def _try_starting_process(self):
2437 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002438 return
2439
mbligh4608b002010-01-05 18:22:35 +00002440 # actually run the command
2441 super(SelfThrottledPostJobTask, self).run()
2442 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002443
mblighd5c95802008-03-05 00:33:46 +00002444
mbligh4608b002010-01-05 18:22:35 +00002445 def finished(self, success):
2446 super(SelfThrottledPostJobTask, self).finished(success)
2447 if self._process_started():
2448 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002449
showard21baa452008-10-21 00:08:39 +00002450
mbligh4608b002010-01-05 18:22:35 +00002451class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002452 def __init__(self, queue_entries):
2453 super(FinalReparseTask, self).__init__(queue_entries,
2454 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002455 # don't use _set_ids, since we don't want to set the host_ids
2456 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002457
2458
2459 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002460 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002461 results_dir]
2462
2463
2464 @property
2465 def num_processes(self):
2466 return 0 # don't include parser processes in accounting
2467
2468
2469 def _pidfile_name(self):
2470 return _PARSER_PID_FILE
2471
2472
showard97aed502008-11-04 02:01:24 +00002473 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002474 def _max_processes(cls):
2475 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002476
2477
2478 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002479 self._check_queue_entry_statuses(
2480 self.queue_entries,
2481 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002482
showard97aed502008-11-04 02:01:24 +00002483 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002484
2485
2486 def epilog(self):
2487 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002488 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002489
2490
mbligh4608b002010-01-05 18:22:35 +00002491class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002492 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2493
mbligh4608b002010-01-05 18:22:35 +00002494 def __init__(self, queue_entries):
2495 super(ArchiveResultsTask, self).__init__(queue_entries,
2496 log_file_name='.archiving.log')
2497 # don't use _set_ids, since we don't want to set the host_ids
2498 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002499
2500
mbligh4608b002010-01-05 18:22:35 +00002501 def _pidfile_name(self):
2502 return _ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002503
2504
mbligh4608b002010-01-05 18:22:35 +00002505 def _generate_command(self, results_dir):
2506 return [_autoserv_path , '-p',
2507 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
2508 '--use-existing-results',
showard948eb302010-01-15 00:16:20 +00002509 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2510 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002511
2512
mbligh4608b002010-01-05 18:22:35 +00002513 @classmethod
2514 def _max_processes(cls):
2515 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002516
2517
2518 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002519 self._check_queue_entry_statuses(
2520 self.queue_entries,
2521 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2522
2523 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002524
2525
mbligh4608b002010-01-05 18:22:35 +00002526 def epilog(self):
2527 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002528 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002529 failed_file = os.path.join(self._working_directory(),
2530 self._ARCHIVING_FAILED_FILE)
2531 paired_process = self._paired_with_monitor().get_process()
2532 _drone_manager.write_lines_to_file(
2533 failed_file, ['Archiving failed with exit code %s'
2534 % self.monitor.exit_code()],
2535 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002536 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002537
2538
showarda3c58572009-03-12 20:36:59 +00002539class DBError(Exception):
2540 """Raised by the DBObject constructor when its select fails."""
2541
2542
mbligh36768f02008-02-22 18:28:33 +00002543class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002544 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002545
2546 # Subclasses MUST override these:
2547 _table_name = ''
2548 _fields = ()
2549
showarda3c58572009-03-12 20:36:59 +00002550 # A mapping from (type, id) to the instance of the object for that
2551 # particular id. This prevents us from creating new Job() and Host()
2552 # instances for every HostQueueEntry object that we instantiate as
2553 # multiple HQEs often share the same Job.
2554 _instances_by_type_and_id = weakref.WeakValueDictionary()
2555 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002556
showarda3c58572009-03-12 20:36:59 +00002557
2558 def __new__(cls, id=None, **kwargs):
2559 """
2560 Look to see if we already have an instance for this particular type
2561 and id. If so, use it instead of creating a duplicate instance.
2562 """
2563 if id is not None:
2564 instance = cls._instances_by_type_and_id.get((cls, id))
2565 if instance:
2566 return instance
2567 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2568
2569
2570 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002571 assert bool(id) or bool(row)
2572 if id is not None and row is not None:
2573 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002574 assert self._table_name, '_table_name must be defined in your class'
2575 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002576 if not new_record:
2577 if self._initialized and not always_query:
2578 return # We've already been initialized.
2579 if id is None:
2580 id = row[0]
2581 # Tell future constructors to use us instead of re-querying while
2582 # this instance is still around.
2583 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002584
showard6ae5ea92009-02-25 00:11:51 +00002585 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002586
jadmanski0afbb632008-06-06 21:10:57 +00002587 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002588
jadmanski0afbb632008-06-06 21:10:57 +00002589 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002590 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002591
showarda3c58572009-03-12 20:36:59 +00002592 if self._initialized:
2593 differences = self._compare_fields_in_row(row)
2594 if differences:
showard7629f142009-03-27 21:02:02 +00002595 logging.warn(
2596 'initialized %s %s instance requery is updating: %s',
2597 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002598 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002599 self._initialized = True
2600
2601
2602 @classmethod
2603 def _clear_instance_cache(cls):
2604 """Used for testing, clear the internal instance cache."""
2605 cls._instances_by_type_and_id.clear()
2606
2607
showardccbd6c52009-03-21 00:10:21 +00002608 def _fetch_row_from_db(self, row_id):
2609 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2610 rows = _db.execute(sql, (row_id,))
2611 if not rows:
showard76e29d12009-04-15 21:53:10 +00002612 raise DBError("row not found (table=%s, row id=%s)"
2613 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002614 return rows[0]
2615
2616
showarda3c58572009-03-12 20:36:59 +00002617 def _assert_row_length(self, row):
2618 assert len(row) == len(self._fields), (
2619 "table = %s, row = %s/%d, fields = %s/%d" % (
2620 self.__table, row, len(row), self._fields, len(self._fields)))
2621
2622
2623 def _compare_fields_in_row(self, row):
2624 """
showarddae680a2009-10-12 20:26:43 +00002625 Given a row as returned by a SELECT query, compare it to our existing in
2626 memory fields. Fractional seconds are stripped from datetime values
2627 before comparison.
showarda3c58572009-03-12 20:36:59 +00002628
2629 @param row - A sequence of values corresponding to fields named in
2630 The class attribute _fields.
2631
2632 @returns A dictionary listing the differences keyed by field name
2633 containing tuples of (current_value, row_value).
2634 """
2635 self._assert_row_length(row)
2636 differences = {}
showarddae680a2009-10-12 20:26:43 +00002637 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002638 for field, row_value in itertools.izip(self._fields, row):
2639 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002640 if (isinstance(current_value, datetime.datetime)
2641 and isinstance(row_value, datetime.datetime)):
2642 current_value = current_value.strftime(datetime_cmp_fmt)
2643 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002644 if current_value != row_value:
2645 differences[field] = (current_value, row_value)
2646 return differences
showard2bab8f42008-11-12 18:15:22 +00002647
2648
2649 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002650 """
2651 Update our field attributes using a single row returned by SELECT.
2652
2653 @param row - A sequence of values corresponding to fields named in
2654 the class fields list.
2655 """
2656 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002657
showard2bab8f42008-11-12 18:15:22 +00002658 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002659 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002660 setattr(self, field, value)
2661 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002662
showard2bab8f42008-11-12 18:15:22 +00002663 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002664
mblighe2586682008-02-29 22:45:46 +00002665
showardccbd6c52009-03-21 00:10:21 +00002666 def update_from_database(self):
2667 assert self.id is not None
2668 row = self._fetch_row_from_db(self.id)
2669 self._update_fields_from_row(row)
2670
2671
jadmanski0afbb632008-06-06 21:10:57 +00002672 def count(self, where, table = None):
2673 if not table:
2674 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002675
jadmanski0afbb632008-06-06 21:10:57 +00002676 rows = _db.execute("""
2677 SELECT count(*) FROM %s
2678 WHERE %s
2679 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002680
jadmanski0afbb632008-06-06 21:10:57 +00002681 assert len(rows) == 1
2682
2683 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002684
2685
showardd3dc1992009-04-22 21:01:40 +00002686 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002687 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002688
showard2bab8f42008-11-12 18:15:22 +00002689 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002690 return
mbligh36768f02008-02-22 18:28:33 +00002691
mblighf8c624d2008-07-03 16:58:45 +00002692 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002693 _db.execute(query, (value, self.id))
2694
showard2bab8f42008-11-12 18:15:22 +00002695 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002696
2697
jadmanski0afbb632008-06-06 21:10:57 +00002698 def save(self):
2699 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002700 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002701 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002702 values = []
2703 for key in keys:
2704 value = getattr(self, key)
2705 if value is None:
2706 values.append('NULL')
2707 else:
2708 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002709 values_str = ','.join(values)
2710 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2711 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002712 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002713 # Update our id to the one the database just assigned to us.
2714 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002715
2716
jadmanski0afbb632008-06-06 21:10:57 +00002717 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002718 self._instances_by_type_and_id.pop((type(self), id), None)
2719 self._initialized = False
2720 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002721 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2722 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002723
2724
showard63a34772008-08-18 19:32:50 +00002725 @staticmethod
2726 def _prefix_with(string, prefix):
2727 if string:
2728 string = prefix + string
2729 return string
2730
2731
jadmanski0afbb632008-06-06 21:10:57 +00002732 @classmethod
showard989f25d2008-10-01 11:38:11 +00002733 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002734 """
2735 Construct instances of our class based on the given database query.
2736
2737 @yields One class instance for each row fetched.
2738 """
showard63a34772008-08-18 19:32:50 +00002739 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2740 where = cls._prefix_with(where, 'WHERE ')
2741 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002742 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002743 'joins' : joins,
2744 'where' : where,
2745 'order_by' : order_by})
2746 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002747 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002748
mbligh36768f02008-02-22 18:28:33 +00002749
2750class IneligibleHostQueue(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002751 _table_name = 'afe_ineligible_host_queues'
showard6ae5ea92009-02-25 00:11:51 +00002752 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002753
2754
showard89f84db2009-03-12 20:39:13 +00002755class AtomicGroup(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002756 _table_name = 'afe_atomic_groups'
showard205fd602009-03-21 00:17:35 +00002757 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2758 'invalid')
showard89f84db2009-03-12 20:39:13 +00002759
2760
showard989f25d2008-10-01 11:38:11 +00002761class Label(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002762 _table_name = 'afe_labels'
showard6ae5ea92009-02-25 00:11:51 +00002763 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002764 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002765
2766
showard6157c632009-07-06 20:19:31 +00002767 def __repr__(self):
2768 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2769 self.name, self.id, self.atomic_group_id)
2770
2771
mbligh36768f02008-02-22 18:28:33 +00002772class Host(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002773 _table_name = 'afe_hosts'
showard6ae5ea92009-02-25 00:11:51 +00002774 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2775 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2776
2777
jadmanski0afbb632008-06-06 21:10:57 +00002778 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002779 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002780 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002781
2782
showard170873e2009-01-07 00:22:26 +00002783 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002784 """
showard170873e2009-01-07 00:22:26 +00002785 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002786 """
2787 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00002788 SELECT afe_labels.name, afe_labels.platform
2789 FROM afe_labels
2790 INNER JOIN afe_hosts_labels ON
2791 afe_labels.id = afe_hosts_labels.label_id
2792 WHERE afe_hosts_labels.host_id = %s
2793 ORDER BY afe_labels.name
showardd8e548a2008-09-09 03:04:57 +00002794 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002795 platform = None
2796 all_labels = []
2797 for label_name, is_platform in rows:
2798 if is_platform:
2799 platform = label_name
2800 all_labels.append(label_name)
2801 return platform, all_labels
2802
2803
showard54c1ea92009-05-20 00:32:58 +00002804 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2805
2806
2807 @classmethod
2808 def cmp_for_sort(cls, a, b):
2809 """
2810 A comparison function for sorting Host objects by hostname.
2811
2812 This strips any trailing numeric digits, ignores leading 0s and
2813 compares hostnames by the leading name and the trailing digits as a
2814 number. If both hostnames do not match this pattern, they are simply
2815 compared as lower case strings.
2816
2817 Example of how hostnames will be sorted:
2818
2819 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2820
2821 This hopefully satisfy most people's hostname sorting needs regardless
2822 of their exact naming schemes. Nobody sane should have both a host10
2823 and host010 (but the algorithm works regardless).
2824 """
2825 lower_a = a.hostname.lower()
2826 lower_b = b.hostname.lower()
2827 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2828 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2829 if match_a and match_b:
2830 name_a, number_a_str = match_a.groups()
2831 name_b, number_b_str = match_b.groups()
2832 number_a = int(number_a_str.lstrip('0'))
2833 number_b = int(number_b_str.lstrip('0'))
2834 result = cmp((name_a, number_a), (name_b, number_b))
2835 if result == 0 and lower_a != lower_b:
2836 # If they compared equal above but the lower case names are
2837 # indeed different, don't report equality. abc012 != abc12.
2838 return cmp(lower_a, lower_b)
2839 return result
2840 else:
2841 return cmp(lower_a, lower_b)
2842
2843
mbligh36768f02008-02-22 18:28:33 +00002844class HostQueueEntry(DBObject):
showardeab66ce2009-12-23 00:03:56 +00002845 _table_name = 'afe_host_queue_entries'
showard6ae5ea92009-02-25 00:11:51 +00002846 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002847 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002848 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002849
2850
showarda3c58572009-03-12 20:36:59 +00002851 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002852 assert id or row
showarda3c58572009-03-12 20:36:59 +00002853 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002854 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002855
jadmanski0afbb632008-06-06 21:10:57 +00002856 if self.host_id:
2857 self.host = Host(self.host_id)
2858 else:
2859 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002860
showard77182562009-06-10 00:16:05 +00002861 if self.atomic_group_id:
2862 self.atomic_group = AtomicGroup(self.atomic_group_id,
2863 always_query=False)
2864 else:
2865 self.atomic_group = None
2866
showard170873e2009-01-07 00:22:26 +00002867 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002868 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002869
2870
showard89f84db2009-03-12 20:39:13 +00002871 @classmethod
2872 def clone(cls, template):
2873 """
2874 Creates a new row using the values from a template instance.
2875
2876 The new instance will not exist in the database or have a valid
2877 id attribute until its save() method is called.
2878 """
2879 assert isinstance(template, cls)
2880 new_row = [getattr(template, field) for field in cls._fields]
2881 clone = cls(row=new_row, new_record=True)
2882 clone.id = None
2883 return clone
2884
2885
showardc85c21b2008-11-24 22:17:37 +00002886 def _view_job_url(self):
2887 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2888
2889
showardf1ae3542009-05-11 19:26:02 +00002890 def get_labels(self):
2891 """
2892 Get all labels associated with this host queue entry (either via the
2893 meta_host or as a job dependency label). The labels yielded are not
2894 guaranteed to be unique.
2895
2896 @yields Label instances associated with this host_queue_entry.
2897 """
2898 if self.meta_host:
2899 yield Label(id=self.meta_host, always_query=False)
2900 labels = Label.fetch(
showardeab66ce2009-12-23 00:03:56 +00002901 joins="JOIN afe_jobs_dependency_labels AS deps "
2902 "ON (afe_labels.id = deps.label_id)",
showardf1ae3542009-05-11 19:26:02 +00002903 where="deps.job_id = %d" % self.job.id)
2904 for label in labels:
2905 yield label
2906
2907
jadmanski0afbb632008-06-06 21:10:57 +00002908 def set_host(self, host):
2909 if host:
2910 self.queue_log_record('Assigning host ' + host.hostname)
2911 self.update_field('host_id', host.id)
2912 self.update_field('active', True)
2913 self.block_host(host.id)
2914 else:
2915 self.queue_log_record('Releasing host')
2916 self.unblock_host(self.host.id)
2917 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002918
jadmanski0afbb632008-06-06 21:10:57 +00002919 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002920
2921
jadmanski0afbb632008-06-06 21:10:57 +00002922 def queue_log_record(self, log_line):
2923 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002924 _drone_manager.write_lines_to_file(self.queue_log_path,
2925 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002926
2927
jadmanski0afbb632008-06-06 21:10:57 +00002928 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002929 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002930 row = [0, self.job.id, host_id]
2931 block = IneligibleHostQueue(row=row, new_record=True)
2932 block.save()
mblighe2586682008-02-29 22:45:46 +00002933
2934
jadmanski0afbb632008-06-06 21:10:57 +00002935 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002936 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002937 blocks = IneligibleHostQueue.fetch(
2938 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2939 for block in blocks:
2940 block.delete()
mblighe2586682008-02-29 22:45:46 +00002941
2942
showard2bab8f42008-11-12 18:15:22 +00002943 def set_execution_subdir(self, subdir=None):
2944 if subdir is None:
showarda9545c02009-12-18 22:44:26 +00002945 assert self.host
2946 subdir = self.host.hostname
showard2bab8f42008-11-12 18:15:22 +00002947 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002948
2949
showard6355f6b2008-12-05 18:52:13 +00002950 def _get_hostname(self):
2951 if self.host:
2952 return self.host.hostname
2953 return 'no host'
2954
2955
showard170873e2009-01-07 00:22:26 +00002956 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002957 flags = []
2958 if self.active:
2959 flags.append('active')
2960 if self.complete:
2961 flags.append('complete')
2962 if self.deleted:
2963 flags.append('deleted')
2964 if self.aborted:
2965 flags.append('aborted')
2966 flags_str = ','.join(flags)
2967 if flags_str:
2968 flags_str = ' [%s]' % flags_str
2969 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2970 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002971
2972
jadmanski0afbb632008-06-06 21:10:57 +00002973 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002974 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002975
showard56824072009-10-12 20:30:21 +00002976 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002977
mbligh4608b002010-01-05 18:22:35 +00002978 active = (status in models.HostQueueEntry.ACTIVE_STATUSES)
2979 complete = (status in models.HostQueueEntry.COMPLETE_STATUSES)
2980 assert not (active and complete)
mbligh36768f02008-02-22 18:28:33 +00002981
mbligh4608b002010-01-05 18:22:35 +00002982 self.update_field('active', active)
2983 self.update_field('complete', complete)
mbligh36768f02008-02-22 18:28:33 +00002984
mbligh4608b002010-01-05 18:22:35 +00002985 if complete:
showardf85a0b72009-10-07 20:48:45 +00002986 self._on_complete()
mbligh4608b002010-01-05 18:22:35 +00002987 self._email_on_job_complete()
showardc85c21b2008-11-24 22:17:37 +00002988
2989 should_email_status = (status.lower() in _notify_email_statuses or
2990 'all' in _notify_email_statuses)
2991 if should_email_status:
2992 self._email_on_status(status)
2993
showardc85c21b2008-11-24 22:17:37 +00002994
showardf85a0b72009-10-07 20:48:45 +00002995 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00002996 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00002997 if not self.execution_subdir:
2998 return
2999 # unregister any possible pidfiles associated with this queue entry
3000 for pidfile_name in _ALL_PIDFILE_NAMES:
3001 pidfile_id = _drone_manager.get_pidfile_id_from(
3002 self.execution_path(), pidfile_name=pidfile_name)
3003 _drone_manager.unregister_pidfile(pidfile_id)
3004
3005
showardc85c21b2008-11-24 22:17:37 +00003006 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00003007 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00003008
3009 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
3010 self.job.id, self.job.name, hostname, status)
3011 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
3012 self.job.id, self.job.name, hostname, status,
3013 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00003014 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00003015
3016
3017 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00003018 if not self.job.is_finished():
3019 return
showard542e8402008-09-19 20:16:18 +00003020
showardc85c21b2008-11-24 22:17:37 +00003021 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00003022 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00003023 for queue_entry in hosts_queue:
3024 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00003025 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00003026 queue_entry.status))
3027
3028 summary_text = "\n".join(summary_text)
3029 status_counts = models.Job.objects.get_status_counts(
3030 [self.job.id])[self.job.id]
3031 status = ', '.join('%d %s' % (count, status) for status, count
3032 in status_counts.iteritems())
3033
3034 subject = 'Autotest: Job ID: %s "%s" %s' % (
3035 self.job.id, self.job.name, status)
3036 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
3037 self.job.id, self.job.name, status, self._view_job_url(),
3038 summary_text)
showard170873e2009-01-07 00:22:26 +00003039 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00003040
3041
showard8cc058f2009-09-08 16:26:33 +00003042 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00003043 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00003044 assert assigned_host
3045 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00003046 if self.host_id is None:
3047 self.set_host(assigned_host)
3048 else:
3049 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00003050
showard2ca64c92009-12-10 21:41:02 +00003051 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00003052 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00003053 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00003054
showard8cc058f2009-09-08 16:26:33 +00003055 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00003056
3057
showard8cc058f2009-09-08 16:26:33 +00003058 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00003059 # Every host goes thru the Verifying stage (which may or may not
3060 # actually do anything as determined by get_pre_job_tasks).
3061 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00003062 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00003063
showard6ae5ea92009-02-25 00:11:51 +00003064
jadmanski0afbb632008-06-06 21:10:57 +00003065 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00003066 assert self.host
showard8cc058f2009-09-08 16:26:33 +00003067 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00003068 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00003069 # verify/cleanup failure sets the execution subdir, so reset it here
3070 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00003071 if self.meta_host:
3072 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00003073
3074
jadmanskif7fa2cc2008-10-01 14:13:23 +00003075 @property
3076 def aborted_by(self):
3077 self._load_abort_info()
3078 return self._aborted_by
3079
3080
3081 @property
3082 def aborted_on(self):
3083 self._load_abort_info()
3084 return self._aborted_on
3085
3086
3087 def _load_abort_info(self):
3088 """ Fetch info about who aborted the job. """
3089 if hasattr(self, "_aborted_by"):
3090 return
3091 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00003092 SELECT afe_users.login,
3093 afe_aborted_host_queue_entries.aborted_on
3094 FROM afe_aborted_host_queue_entries
3095 INNER JOIN afe_users
3096 ON afe_users.id = afe_aborted_host_queue_entries.aborted_by_id
3097 WHERE afe_aborted_host_queue_entries.queue_entry_id = %s
jadmanskif7fa2cc2008-10-01 14:13:23 +00003098 """, (self.id,))
3099 if rows:
3100 self._aborted_by, self._aborted_on = rows[0]
3101 else:
3102 self._aborted_by = self._aborted_on = None
3103
3104
showardb2e2c322008-10-14 17:33:55 +00003105 def on_pending(self):
3106 """
3107 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00003108 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
3109 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00003110 """
showard8cc058f2009-09-08 16:26:33 +00003111 self.set_status(models.HostQueueEntry.Status.PENDING)
3112 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00003113
3114 # Some debug code here: sends an email if an asynchronous job does not
3115 # immediately enter Starting.
3116 # TODO: Remove this once we figure out why asynchronous jobs are getting
3117 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00003118 self.job.run_if_ready(queue_entry=self)
3119 if (self.job.synch_count == 1 and
3120 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00003121 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
3122 message = 'Asynchronous job stuck in Pending'
3123 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00003124
3125
showardd3dc1992009-04-22 21:01:40 +00003126 def abort(self, dispatcher):
3127 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00003128
showardd3dc1992009-04-22 21:01:40 +00003129 Status = models.HostQueueEntry.Status
mbligh4608b002010-01-05 18:22:35 +00003130 if self.status in (Status.GATHERING, Status.PARSING, Status.ARCHIVING):
showardd3dc1992009-04-22 21:01:40 +00003131 # do nothing; post-job tasks will finish and then mark this entry
3132 # with status "Aborted" and take care of the host
3133 return
3134
showard8cc058f2009-09-08 16:26:33 +00003135 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
3136 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00003137 self.host.set_status(models.Host.Status.READY)
3138 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00003139 models.SpecialTask.objects.create(
3140 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00003141 host=models.Host.objects.get(id=self.host.id),
3142 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00003143
3144 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00003145 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00003146
showard8cc058f2009-09-08 16:26:33 +00003147
3148 def get_group_name(self):
3149 atomic_group = self.atomic_group
3150 if not atomic_group:
3151 return ''
3152
3153 # Look at any meta_host and dependency labels and pick the first
3154 # one that also specifies this atomic group. Use that label name
3155 # as the group name if possible (it is more specific).
3156 for label in self.get_labels():
3157 if label.atomic_group_id:
3158 assert label.atomic_group_id == atomic_group.id
3159 return label.name
3160 return atomic_group.name
3161
3162
showard170873e2009-01-07 00:22:26 +00003163 def execution_tag(self):
3164 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003165 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003166
3167
showarded2afea2009-07-07 20:54:07 +00003168 def execution_path(self):
3169 return self.execution_tag()
3170
3171
showarda9545c02009-12-18 22:44:26 +00003172 def set_started_on_now(self):
3173 self.update_field('started_on', datetime.datetime.now())
3174
3175
3176 def is_hostless(self):
3177 return (self.host_id is None
3178 and self.meta_host is None
3179 and self.atomic_group_id is None)
3180
3181
mbligh36768f02008-02-22 18:28:33 +00003182class Job(DBObject):
showardeab66ce2009-12-23 00:03:56 +00003183 _table_name = 'afe_jobs'
showard6ae5ea92009-02-25 00:11:51 +00003184 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3185 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003186 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003187 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003188
showard77182562009-06-10 00:16:05 +00003189 # This does not need to be a column in the DB. The delays are likely to
3190 # be configured short. If the scheduler is stopped and restarted in
3191 # the middle of a job's delay cycle, the delay cycle will either be
3192 # repeated or skipped depending on the number of Pending machines found
3193 # when the restarted scheduler recovers to track it. Not a problem.
3194 #
3195 # A reference to the DelayedCallTask that will wake up the job should
3196 # no other HQEs change state in time. Its end_time attribute is used
3197 # by our run_with_ready_delay() method to determine if the wait is over.
3198 _delay_ready_task = None
3199
3200 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3201 # all status='Pending' atomic group HQEs incase a delay was running when the
3202 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003203
showarda3c58572009-03-12 20:36:59 +00003204 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003205 assert id or row
showarda3c58572009-03-12 20:36:59 +00003206 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003207 self._owner_model = None # caches model instance of owner
3208
3209
showardc1a98d12010-01-15 00:22:22 +00003210 def model(self):
3211 return models.Job.objects.get(id=self.id)
3212
3213
showard9bb960b2009-11-19 01:02:11 +00003214 def owner_model(self):
3215 # work around the fact that the Job owner field is a string, not a
3216 # foreign key
3217 if not self._owner_model:
3218 self._owner_model = models.User.objects.get(login=self.owner)
3219 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003220
mblighe2586682008-02-29 22:45:46 +00003221
jadmanski0afbb632008-06-06 21:10:57 +00003222 def is_server_job(self):
3223 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003224
3225
showard170873e2009-01-07 00:22:26 +00003226 def tag(self):
3227 return "%s-%s" % (self.id, self.owner)
3228
3229
jadmanski0afbb632008-06-06 21:10:57 +00003230 def get_host_queue_entries(self):
3231 rows = _db.execute("""
showardeab66ce2009-12-23 00:03:56 +00003232 SELECT * FROM afe_host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +00003233 WHERE job_id= %s
3234 """, (self.id,))
3235 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003236
jadmanski0afbb632008-06-06 21:10:57 +00003237 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003238
jadmanski0afbb632008-06-06 21:10:57 +00003239 return entries
mbligh36768f02008-02-22 18:28:33 +00003240
3241
jadmanski0afbb632008-06-06 21:10:57 +00003242 def set_status(self, status, update_queues=False):
3243 self.update_field('status',status)
3244
3245 if update_queues:
3246 for queue_entry in self.get_host_queue_entries():
3247 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003248
3249
showardc1a98d12010-01-15 00:22:22 +00003250 def keyval_dict(self):
3251 return self.model().keyval_dict()
3252
3253
showard77182562009-06-10 00:16:05 +00003254 def _atomic_and_has_started(self):
3255 """
3256 @returns True if any of the HostQueueEntries associated with this job
3257 have entered the Status.STARTING state or beyond.
3258 """
3259 atomic_entries = models.HostQueueEntry.objects.filter(
3260 job=self.id, atomic_group__isnull=False)
3261 if atomic_entries.count() <= 0:
3262 return False
3263
showardaf8b4ca2009-06-16 18:47:26 +00003264 # These states may *only* be reached if Job.run() has been called.
3265 started_statuses = (models.HostQueueEntry.Status.STARTING,
3266 models.HostQueueEntry.Status.RUNNING,
3267 models.HostQueueEntry.Status.COMPLETED)
3268
3269 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003270 return started_entries.count() > 0
3271
3272
showard708b3522009-08-20 23:26:15 +00003273 def _hosts_assigned_count(self):
3274 """The number of HostQueueEntries assigned a Host for this job."""
3275 entries = models.HostQueueEntry.objects.filter(job=self.id,
3276 host__isnull=False)
3277 return entries.count()
3278
3279
showard77182562009-06-10 00:16:05 +00003280 def _pending_count(self):
3281 """The number of HostQueueEntries for this job in the Pending state."""
3282 pending_entries = models.HostQueueEntry.objects.filter(
3283 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3284 return pending_entries.count()
3285
3286
showardd07a5f32009-12-07 19:36:20 +00003287 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003288 """
3289 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003290 are using to set an upper bound on the threshold.
3291 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003292 this job can run.
3293 """
3294 return min(self._hosts_assigned_count(),
3295 atomic_group.max_number_of_machines)
3296
3297
showardd07a5f32009-12-07 19:36:20 +00003298 def _min_hosts_needed_to_run(self):
3299 """Return the minumum number of hsots needed to run this job."""
3300 return self.synch_count
3301
3302
jadmanski0afbb632008-06-06 21:10:57 +00003303 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003304 # NOTE: Atomic group jobs stop reporting ready after they have been
3305 # started to avoid launching multiple copies of one atomic job.
3306 # Only possible if synch_count is less than than half the number of
3307 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003308 pending_count = self._pending_count()
3309 atomic_and_has_started = self._atomic_and_has_started()
3310 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003311 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003312
3313 if not ready:
3314 logging.info(
3315 'Job %s not ready: %s pending, %s required '
3316 '(Atomic and started: %s)',
3317 self, pending_count, self.synch_count,
3318 atomic_and_has_started)
3319
3320 return ready
mbligh36768f02008-02-22 18:28:33 +00003321
3322
jadmanski0afbb632008-06-06 21:10:57 +00003323 def num_machines(self, clause = None):
3324 sql = "job_id=%s" % self.id
3325 if clause:
3326 sql += " AND (%s)" % clause
showardeab66ce2009-12-23 00:03:56 +00003327 return self.count(sql, table='afe_host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003328
3329
jadmanski0afbb632008-06-06 21:10:57 +00003330 def num_queued(self):
3331 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003332
3333
jadmanski0afbb632008-06-06 21:10:57 +00003334 def num_active(self):
3335 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003336
3337
jadmanski0afbb632008-06-06 21:10:57 +00003338 def num_complete(self):
3339 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003340
3341
jadmanski0afbb632008-06-06 21:10:57 +00003342 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003343 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003344
mbligh36768f02008-02-22 18:28:33 +00003345
showard6bb7c292009-01-30 01:44:51 +00003346 def _not_yet_run_entries(self, include_verifying=True):
3347 statuses = [models.HostQueueEntry.Status.QUEUED,
3348 models.HostQueueEntry.Status.PENDING]
3349 if include_verifying:
3350 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3351 return models.HostQueueEntry.objects.filter(job=self.id,
3352 status__in=statuses)
3353
3354
3355 def _stop_all_entries(self):
3356 entries_to_stop = self._not_yet_run_entries(
3357 include_verifying=False)
3358 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003359 assert not child_entry.complete, (
3360 '%s status=%s, active=%s, complete=%s' %
3361 (child_entry.id, child_entry.status, child_entry.active,
3362 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003363 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3364 child_entry.host.status = models.Host.Status.READY
3365 child_entry.host.save()
3366 child_entry.status = models.HostQueueEntry.Status.STOPPED
3367 child_entry.save()
3368
showard2bab8f42008-11-12 18:15:22 +00003369 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003370 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003371 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003372 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003373
3374
jadmanski0afbb632008-06-06 21:10:57 +00003375 def write_to_machines_file(self, queue_entry):
showarda9545c02009-12-18 22:44:26 +00003376 hostname = queue_entry.host.hostname
showard170873e2009-01-07 00:22:26 +00003377 file_path = os.path.join(self.tag(), '.machines')
3378 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003379
3380
showardf1ae3542009-05-11 19:26:02 +00003381 def _next_group_name(self, group_name=''):
3382 """@returns a directory name to use for the next host group results."""
3383 if group_name:
3384 # Sanitize for use as a pathname.
3385 group_name = group_name.replace(os.path.sep, '_')
3386 if group_name.startswith('.'):
3387 group_name = '_' + group_name[1:]
3388 # Add a separator between the group name and 'group%d'.
3389 group_name += '.'
3390 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003391 query = models.HostQueueEntry.objects.filter(
3392 job=self.id).values('execution_subdir').distinct()
3393 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003394 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3395 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003396 if ids:
3397 next_id = max(ids) + 1
3398 else:
3399 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003400 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003401
3402
showarddb502762009-09-09 15:31:20 +00003403 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003404 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003405 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003406 return control_path
mbligh36768f02008-02-22 18:28:33 +00003407
showardb2e2c322008-10-14 17:33:55 +00003408
showard2bab8f42008-11-12 18:15:22 +00003409 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003410 """
3411 @param queue_entry_from_group: A HostQueueEntry instance to find other
3412 group entries on this job for.
3413
3414 @returns A list of HostQueueEntry objects all executing this job as
3415 part of the same group as the one supplied (having the same
3416 execution_subdir).
3417 """
showard2bab8f42008-11-12 18:15:22 +00003418 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003419 return list(HostQueueEntry.fetch(
3420 where='job_id=%s AND execution_subdir=%s',
3421 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003422
3423
showard8cc058f2009-09-08 16:26:33 +00003424 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003425 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003426 execution_path = queue_entries[0].execution_path()
3427 control_path = self._write_control_file(execution_path)
showarda9545c02009-12-18 22:44:26 +00003428 hostnames = ','.join(entry.host.hostname
3429 for entry in queue_entries
3430 if not entry.is_hostless())
mbligh36768f02008-02-22 18:28:33 +00003431
showarddb502762009-09-09 15:31:20 +00003432 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003433 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003434 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003435 ['-P', execution_tag, '-n',
3436 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003437 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003438
jadmanski0afbb632008-06-06 21:10:57 +00003439 if not self.is_server_job():
3440 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003441
showardb2e2c322008-10-14 17:33:55 +00003442 return params
mblighe2586682008-02-29 22:45:46 +00003443
mbligh36768f02008-02-22 18:28:33 +00003444
showardc9ae1782009-01-30 01:42:37 +00003445 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003446 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003447 return True
showard0fc38302008-10-23 00:44:07 +00003448 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showarda9545c02009-12-18 22:44:26 +00003449 return queue_entry.host.dirty
showardc9ae1782009-01-30 01:42:37 +00003450 return False
showard21baa452008-10-21 00:08:39 +00003451
showardc9ae1782009-01-30 01:42:37 +00003452
showard8cc058f2009-09-08 16:26:33 +00003453 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003454 do_not_verify = (queue_entry.host.protection ==
3455 host_protections.Protection.DO_NOT_VERIFY)
3456 if do_not_verify:
3457 return False
3458 return self.run_verify
3459
3460
showard8cc058f2009-09-08 16:26:33 +00003461 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003462 """
3463 Get a list of tasks to perform before the host_queue_entry
3464 may be used to run this Job (such as Cleanup & Verify).
3465
3466 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003467 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003468 task in the list calls HostQueueEntry.on_pending(), which
3469 continues the flow of the job.
3470 """
showardc9ae1782009-01-30 01:42:37 +00003471 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003472 task = models.SpecialTask.Task.CLEANUP
3473 elif self._should_run_verify(queue_entry):
3474 task = models.SpecialTask.Task.VERIFY
3475 else:
3476 queue_entry.on_pending()
3477 return
3478
showard9bb960b2009-11-19 01:02:11 +00003479 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003480 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003481 host=models.Host.objects.get(id=queue_entry.host_id),
3482 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003483
3484
showardf1ae3542009-05-11 19:26:02 +00003485 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003486 if len(queue_entries) == 1:
showarda9545c02009-12-18 22:44:26 +00003487 group_subdir_name = queue_entries[0].host.hostname
showard2bab8f42008-11-12 18:15:22 +00003488 else:
showardf1ae3542009-05-11 19:26:02 +00003489 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003490 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003491 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003492 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003493
3494 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003495 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003496
3497
3498 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003499 """
3500 @returns A tuple containing a list of HostQueueEntry instances to be
3501 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003502 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003503 """
showard77182562009-06-10 00:16:05 +00003504 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003505 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003506 if atomic_group:
3507 num_entries_wanted = atomic_group.max_number_of_machines
3508 else:
3509 num_entries_wanted = self.synch_count
3510 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003511
showardf1ae3542009-05-11 19:26:02 +00003512 if num_entries_wanted > 0:
3513 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003514 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003515 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003516 params=(self.id, include_queue_entry.id)))
3517
3518 # Sort the chosen hosts by hostname before slicing.
3519 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3520 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3521 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3522 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003523
showardf1ae3542009-05-11 19:26:02 +00003524 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003525 if len(chosen_entries) < self.synch_count:
3526 message = ('job %s got less than %s chosen entries: %s' % (
3527 self.id, self.synch_count, chosen_entries))
3528 logging.error(message)
3529 email_manager.manager.enqueue_notify_email(
3530 'Job not started, too few chosen entries', message)
3531 return []
showardf1ae3542009-05-11 19:26:02 +00003532
showard8cc058f2009-09-08 16:26:33 +00003533 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003534
3535 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003536 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003537
3538
showard77182562009-06-10 00:16:05 +00003539 def run_if_ready(self, queue_entry):
3540 """
showard8375ce02009-10-12 20:35:13 +00003541 Run this job by kicking its HQEs into status='Starting' if enough
3542 hosts are ready for it to run.
3543
3544 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3545 ready to run.
showard77182562009-06-10 00:16:05 +00003546 """
showardb2e2c322008-10-14 17:33:55 +00003547 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003548 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003549 elif queue_entry.atomic_group:
3550 self.run_with_ready_delay(queue_entry)
3551 else:
3552 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003553
3554
3555 def run_with_ready_delay(self, queue_entry):
3556 """
3557 Start a delay to wait for more hosts to enter Pending state before
3558 launching an atomic group job. Once set, the a delay cannot be reset.
3559
3560 @param queue_entry: The HostQueueEntry object to get atomic group
3561 info from and pass to run_if_ready when the delay is up.
3562
3563 @returns An Agent to run the job as appropriate or None if a delay
3564 has already been set.
3565 """
3566 assert queue_entry.job_id == self.id
3567 assert queue_entry.atomic_group
3568 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003569 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003570 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003571 delay_expired = (self._delay_ready_task and
3572 time.time() >= self._delay_ready_task.end_time)
3573
3574 # Delay is disabled or we already have enough? Do not wait to run.
3575 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003576 self.run(queue_entry)
3577 else:
3578 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003579
showard8cc058f2009-09-08 16:26:33 +00003580
showardd07a5f32009-12-07 19:36:20 +00003581 def request_abort(self):
3582 """Request that this Job be aborted on the next scheduler cycle."""
showardc1a98d12010-01-15 00:22:22 +00003583 self.model().abort()
showardd07a5f32009-12-07 19:36:20 +00003584
3585
showard8cc058f2009-09-08 16:26:33 +00003586 def schedule_delayed_callback_task(self, queue_entry):
3587 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3588
showard77182562009-06-10 00:16:05 +00003589 if self._delay_ready_task:
3590 return None
3591
showard8cc058f2009-09-08 16:26:33 +00003592 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3593
showard77182562009-06-10 00:16:05 +00003594 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003595 logging.info('Job %s done waiting for extra hosts.', self)
3596 # Check to see if the job is still relevant. It could have aborted
3597 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003598 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003599 logging.info('Job %s had too few Pending hosts after waiting '
3600 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003601 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003602 return
showard77182562009-06-10 00:16:05 +00003603 return self.run(queue_entry)
3604
showard708b3522009-08-20 23:26:15 +00003605 logging.info('Job %s waiting up to %s seconds for more hosts.',
3606 self.id, delay)
showard77182562009-06-10 00:16:05 +00003607 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3608 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003609 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003610
3611
3612 def run(self, queue_entry):
3613 """
3614 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003615 """
3616 if queue_entry.atomic_group and self._atomic_and_has_started():
3617 logging.error('Job.run() called on running atomic Job %d '
3618 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003619 return
3620 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003621 if queue_entries:
3622 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003623
3624
showard8cc058f2009-09-08 16:26:33 +00003625 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003626 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003627 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003628 self.abort_delay_ready_task()
3629
3630
3631 def abort_delay_ready_task(self):
3632 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003633 if self._delay_ready_task:
3634 # Cancel any pending callback that would try to run again
3635 # as we are already running.
3636 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003637
showardd2014822009-10-12 20:26:58 +00003638
showardb000a8d2009-07-28 20:02:07 +00003639 def __str__(self):
3640 return '%s-%s' % (self.id, self.owner)
3641
3642
mbligh36768f02008-02-22 18:28:33 +00003643if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003644 main()