blob: 2d878bbf50dfd1d8b9619e4887d9add05eee7638 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
80 raise SchedulerError('Drone sets are enabled, but no default is set')
81
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000102 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000103
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
120 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
121 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000122 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
jamesrenc7d387e2010-08-10 21:48:30 +0000138 # This is helpful for debugging why stuff a scheduler launches is
139 # misbehaving.
140 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 if options.test:
143 global _autoserv_path
144 _autoserv_path = 'autoserv_dummy'
145 global _testing_mode
146 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000147
jamesrenc44ae992010-02-19 00:12:54 +0000148 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000149 server.start()
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 try:
jamesrenc44ae992010-02-19 00:12:54 +0000152 initialize()
showardc5afc462009-01-13 00:09:39 +0000153 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000154 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000155
jadmanski0afbb632008-06-06 21:10:57 +0000156 while not _shutdown:
157 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000158 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000159 except:
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.log_stacktrace(
161 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000162
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000164 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000165 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000166 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000167
168
showard136e6dc2009-06-10 19:38:49 +0000169def setup_logging():
170 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
171 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
172 logging_manager.configure_logging(
173 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
174 logfile_name=log_name)
175
176
mbligh36768f02008-02-22 18:28:33 +0000177def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000178 global _shutdown
179 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000180 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000181
182
jamesrenc44ae992010-02-19 00:12:54 +0000183def initialize():
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
185 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000186
showard8de37132009-08-31 18:33:08 +0000187 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000188 logging.critical("monitor_db already running, aborting!")
189 sys.exit(1)
190 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000191
showardb1e51872008-10-07 11:08:18 +0000192 if _testing_mode:
193 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000194 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000195
jadmanski0afbb632008-06-06 21:10:57 +0000196 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
197 global _db
showard170873e2009-01-07 00:22:26 +0000198 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000199 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000200
showardfa8629c2008-11-04 16:51:23 +0000201 # ensure Django connection is in autocommit
202 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000203 # bypass the readonly connection
204 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000205
showardb18134f2009-03-20 20:52:18 +0000206 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000207 signal.signal(signal.SIGINT, handle_sigint)
208
jamesrenc44ae992010-02-19 00:12:54 +0000209 initialize_globals()
210 scheduler_models.initialize()
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
jamesrenc44ae992010-02-19 00:12:54 +0000222def initialize_globals():
223 global _drone_manager
224 _drone_manager = drone_manager.instance()
225
226
showarded2afea2009-07-07 20:54:07 +0000227def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
228 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000234 @param extra_args - list - Additional arguments to pass to autoserv.
235 @param job - Job object - If supplied, -u owner and -l name parameters
236 will be added.
237 @param queue_entry - A HostQueueEntry object - If supplied and no Job
238 object was supplied, this will be used to lookup the Job object.
239 """
showarda9545c02009-12-18 22:44:26 +0000240 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000241 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000242 if machines:
243 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000244 if job or queue_entry:
245 if not job:
246 job = queue_entry.job
247 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000248 if verbose:
249 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000250 return autoserv_argv + extra_args
251
252
showard89f84db2009-03-12 20:39:13 +0000253class SchedulerError(Exception):
254 """Raised by HostScheduler when an inconsistent state occurs."""
255
256
jamesren883492a2010-02-12 00:45:18 +0000257class HostScheduler(metahost_scheduler.HostSchedulingUtility):
258 """Handles the logic for choosing when to run jobs and on which hosts.
259
260 This class makes several queries to the database on each tick, building up
261 some auxiliary data structures and using them to determine which hosts are
262 eligible to run which jobs, taking into account all the various factors that
263 affect that.
264
265 In the past this was done with one or two very large, complex database
266 queries. It has proven much simpler and faster to build these auxiliary
267 data structures and perform the logic in Python.
268 """
269 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000270 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
271
272 # load site-specific scheduler selected in global_config
273 site_schedulers_str = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
275 default='')
276 site_schedulers = set(site_schedulers_str.split(','))
277 for scheduler in get_site_metahost_schedulers():
278 if type(scheduler).__name__ in site_schedulers:
279 # always prepend, so site schedulers take precedence
280 self._metahost_schedulers = (
281 [scheduler] + self._metahost_schedulers)
282 logging.info('Metahost schedulers: %s',
283 ', '.join(type(scheduler).__name__ for scheduler
284 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000285
286
showard63a34772008-08-18 19:32:50 +0000287 def _get_ready_hosts(self):
288 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000289 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000290 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
291 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000292 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000293 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000294 "AND NOT afe_hosts.locked "
295 "AND (afe_hosts.status IS NULL "
296 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000297 return dict((host.id, host) for host in hosts)
298
299
300 @staticmethod
301 def _get_sql_id_list(id_list):
302 return ','.join(str(item_id) for item_id in id_list)
303
304
305 @classmethod
showard989f25d2008-10-01 11:38:11 +0000306 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000307 if not id_list:
308 return {}
showard63a34772008-08-18 19:32:50 +0000309 query %= cls._get_sql_id_list(id_list)
310 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000311 return cls._process_many2many_dict(rows, flip)
312
313
314 @staticmethod
315 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000316 result = {}
317 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000318 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000319 if flip:
320 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000321 result.setdefault(left_id, set()).add(right_id)
322 return result
323
324
325 @classmethod
326 def _get_job_acl_groups(cls, job_ids):
327 query = """
showardeab66ce2009-12-23 00:03:56 +0000328 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
329 FROM afe_jobs
330 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
331 INNER JOIN afe_acl_groups_users ON
332 afe_acl_groups_users.user_id = afe_users.id
333 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000334 """
335 return cls._get_many2many_dict(query, job_ids)
336
337
338 @classmethod
339 def _get_job_ineligible_hosts(cls, job_ids):
340 query = """
341 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000342 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000343 WHERE job_id IN (%s)
344 """
345 return cls._get_many2many_dict(query, job_ids)
346
347
348 @classmethod
showard989f25d2008-10-01 11:38:11 +0000349 def _get_job_dependencies(cls, job_ids):
350 query = """
351 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000352 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000353 WHERE job_id IN (%s)
354 """
355 return cls._get_many2many_dict(query, job_ids)
356
357
358 @classmethod
showard63a34772008-08-18 19:32:50 +0000359 def _get_host_acls(cls, host_ids):
360 query = """
showardd9ac4452009-02-07 02:04:37 +0000361 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000362 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000363 WHERE host_id IN (%s)
364 """
365 return cls._get_many2many_dict(query, host_ids)
366
367
368 @classmethod
369 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000370 if not host_ids:
371 return {}, {}
showard63a34772008-08-18 19:32:50 +0000372 query = """
373 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000374 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000375 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000376 """ % cls._get_sql_id_list(host_ids)
377 rows = _db.execute(query)
378 labels_to_hosts = cls._process_many2many_dict(rows)
379 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
380 return labels_to_hosts, hosts_to_labels
381
382
383 @classmethod
384 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000385 return dict((label.id, label) for label
386 in scheduler_models.Label.fetch())
387
388
389 def recovery_on_startup(self):
390 for metahost_scheduler in self._metahost_schedulers:
391 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000392
393
394 def refresh(self, pending_queue_entries):
395 self._hosts_available = self._get_ready_hosts()
396
397 relevant_jobs = [queue_entry.job_id
398 for queue_entry in pending_queue_entries]
399 self._job_acls = self._get_job_acl_groups(relevant_jobs)
400 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000401 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000402
403 host_ids = self._hosts_available.keys()
404 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000405 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
406
407 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000408
jamesrene21bf412010-02-26 02:30:07 +0000409
410 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000411 for metahost_scheduler in self._metahost_schedulers:
412 metahost_scheduler.tick()
413
showard63a34772008-08-18 19:32:50 +0000414
jamesren883492a2010-02-12 00:45:18 +0000415 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000416 return set(self._label_hosts.get(label_id, ()))
417
418
419 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000420 self._label_hosts[label_id].remove(host_id)
421
422
423 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000424 return self._hosts_available.pop(host_id)
425
426
427 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000428 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
429
430
showard63a34772008-08-18 19:32:50 +0000431 def _is_acl_accessible(self, host_id, queue_entry):
432 job_acls = self._job_acls.get(queue_entry.job_id, set())
433 host_acls = self._host_acls.get(host_id, set())
434 return len(host_acls.intersection(job_acls)) > 0
435
436
showard989f25d2008-10-01 11:38:11 +0000437 def _check_job_dependencies(self, job_dependencies, host_labels):
438 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000439 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000440
441
442 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
443 queue_entry):
showardade14e22009-01-26 22:38:32 +0000444 if not queue_entry.meta_host:
445 # bypass only_if_needed labels when a specific host is selected
446 return True
447
showard989f25d2008-10-01 11:38:11 +0000448 for label_id in host_labels:
449 label = self._labels[label_id]
450 if not label.only_if_needed:
451 # we don't care about non-only_if_needed labels
452 continue
453 if queue_entry.meta_host == label_id:
454 # if the label was requested in a metahost it's OK
455 continue
456 if label_id not in job_dependencies:
457 return False
458 return True
459
460
showard89f84db2009-03-12 20:39:13 +0000461 def _check_atomic_group_labels(self, host_labels, queue_entry):
462 """
463 Determine if the given HostQueueEntry's atomic group settings are okay
464 to schedule on a host with the given labels.
465
showard6157c632009-07-06 20:19:31 +0000466 @param host_labels: A list of label ids that the host has.
467 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000468
469 @returns True if atomic group settings are okay, False otherwise.
470 """
showard6157c632009-07-06 20:19:31 +0000471 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000472 queue_entry.atomic_group_id)
473
474
showard6157c632009-07-06 20:19:31 +0000475 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000476 """
477 Return the atomic group label id for a host with the given set of
478 labels if any, or None otherwise. Raises an exception if more than
479 one atomic group are found in the set of labels.
480
showard6157c632009-07-06 20:19:31 +0000481 @param host_labels: A list of label ids that the host has.
482 @param queue_entry: The HostQueueEntry we're testing. Only used for
483 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000484
485 @returns The id of the atomic group found on a label in host_labels
486 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000487 """
showard6157c632009-07-06 20:19:31 +0000488 atomic_labels = [self._labels[label_id] for label_id in host_labels
489 if self._labels[label_id].atomic_group_id is not None]
490 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000491 if not atomic_ids:
492 return None
493 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000494 logging.error('More than one Atomic Group on HQE "%s" via: %r',
495 queue_entry, atomic_labels)
496 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000497
498
499 def _get_atomic_group_labels(self, atomic_group_id):
500 """
501 Lookup the label ids that an atomic_group is associated with.
502
503 @param atomic_group_id - The id of the AtomicGroup to look up.
504
505 @returns A generator yeilding Label ids for this atomic group.
506 """
507 return (id for id, label in self._labels.iteritems()
508 if label.atomic_group_id == atomic_group_id
509 and not label.invalid)
510
511
showard54c1ea92009-05-20 00:32:58 +0000512 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000513 """
514 @param group_hosts - A sequence of Host ids to test for usability
515 and eligibility against the Job associated with queue_entry.
516 @param queue_entry - The HostQueueEntry that these hosts are being
517 tested for eligibility against.
518
519 @returns A subset of group_hosts Host ids that are eligible for the
520 supplied queue_entry.
521 """
522 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000523 if self.is_host_usable(host_id)
524 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000525
526
jamesren883492a2010-02-12 00:45:18 +0000527 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000528 if self._is_host_invalid(host_id):
529 # if an invalid host is scheduled for a job, it's a one-time host
530 # and it therefore bypasses eligibility checks. note this can only
531 # happen for non-metahosts, because invalid hosts have their label
532 # relationships cleared.
533 return True
534
showard989f25d2008-10-01 11:38:11 +0000535 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
536 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000537
showard89f84db2009-03-12 20:39:13 +0000538 return (self._is_acl_accessible(host_id, queue_entry) and
539 self._check_job_dependencies(job_dependencies, host_labels) and
540 self._check_only_if_needed_labels(
541 job_dependencies, host_labels, queue_entry) and
542 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000543
544
showard2924b0a2009-06-18 23:16:15 +0000545 def _is_host_invalid(self, host_id):
546 host_object = self._hosts_available.get(host_id, None)
547 return host_object and host_object.invalid
548
549
showard63a34772008-08-18 19:32:50 +0000550 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000551 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000552 return None
553 return self._hosts_available.pop(queue_entry.host_id, None)
554
555
jamesren883492a2010-02-12 00:45:18 +0000556 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000557 if host_id not in self._hosts_available:
558 # host was already used during this scheduling cycle
559 return False
560 if self._hosts_available[host_id].invalid:
561 # Invalid hosts cannot be used for metahosts. They're included in
562 # the original query because they can be used by non-metahosts.
563 return False
564 return True
565
566
jamesren883492a2010-02-12 00:45:18 +0000567 def schedule_entry(self, queue_entry):
568 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000569 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000570
571 for scheduler in self._metahost_schedulers:
572 if scheduler.can_schedule_metahost(queue_entry):
573 scheduler.schedule_metahost(queue_entry, self)
574 return None
575
576 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000577
578
showard89f84db2009-03-12 20:39:13 +0000579 def find_eligible_atomic_group(self, queue_entry):
580 """
581 Given an atomic group host queue entry, locate an appropriate group
582 of hosts for the associated job to run on.
583
584 The caller is responsible for creating new HQEs for the additional
585 hosts returned in order to run the actual job on them.
586
587 @returns A list of Host instances in a ready state to satisfy this
588 atomic group scheduling. Hosts will all belong to the same
589 atomic group label as specified by the queue_entry.
590 An empty list will be returned if no suitable atomic
591 group could be found.
592
593 TODO(gps): what is responsible for kicking off any attempted repairs on
594 a group of hosts? not this function, but something needs to. We do
595 not communicate that reason for returning [] outside of here...
596 For now, we'll just be unschedulable if enough hosts within one group
597 enter Repair Failed state.
598 """
599 assert queue_entry.atomic_group_id is not None
600 job = queue_entry.job
601 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000602 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000603 if job.synch_count > atomic_group.max_number_of_machines:
604 # Such a Job and HostQueueEntry should never be possible to
605 # create using the frontend. Regardless, we can't process it.
606 # Abort it immediately and log an error on the scheduler.
607 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000608 logging.error(
609 'Error: job %d synch_count=%d > requested atomic_group %d '
610 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
611 job.id, job.synch_count, atomic_group.id,
612 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000613 return []
jamesren883492a2010-02-12 00:45:18 +0000614 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
615 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000616
617 # Look in each label associated with atomic_group until we find one with
618 # enough hosts to satisfy the job.
619 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000620 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000621 if queue_entry.meta_host is not None:
622 # If we have a metahost label, only allow its hosts.
623 group_hosts.intersection_update(hosts_in_label)
624 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000625 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000626 group_hosts, queue_entry)
627
628 # Job.synch_count is treated as "minimum synch count" when
629 # scheduling for an atomic group of hosts. The atomic group
630 # number of machines is the maximum to pick out of a single
631 # atomic group label for scheduling at one time.
632 min_hosts = job.synch_count
633 max_hosts = atomic_group.max_number_of_machines
634
showard54c1ea92009-05-20 00:32:58 +0000635 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000636 # Not enough eligible hosts in this atomic group label.
637 continue
638
showard54c1ea92009-05-20 00:32:58 +0000639 eligible_hosts_in_group = [self._hosts_available[id]
640 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000641 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000642 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000643
showard89f84db2009-03-12 20:39:13 +0000644 # Limit ourselves to scheduling the atomic group size.
645 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000646 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000647
648 # Remove the selected hosts from our cached internal state
649 # of available hosts in order to return the Host objects.
650 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000651 for host in eligible_hosts_in_group:
652 hosts_in_label.discard(host.id)
653 self._hosts_available.pop(host.id)
654 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000655 return host_list
656
657 return []
658
659
showard170873e2009-01-07 00:22:26 +0000660class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000661 def __init__(self):
662 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000663 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000664 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000665 user_cleanup_time = scheduler_config.config.clean_interval
666 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
667 _db, user_cleanup_time)
668 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000669 self._host_agents = {}
670 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000671 self._tick_count = 0
672 self._last_garbage_stats_time = time.time()
673 self._seconds_between_garbage_stats = 60 * (
674 global_config.global_config.get_config_value(
675 scheduler_config.CONFIG_SECTION,
676 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000677
mbligh36768f02008-02-22 18:28:33 +0000678
showard915958d2009-04-22 21:00:58 +0000679 def initialize(self, recover_hosts=True):
680 self._periodic_cleanup.initialize()
681 self._24hr_upkeep.initialize()
682
jadmanski0afbb632008-06-06 21:10:57 +0000683 # always recover processes
684 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000685
jadmanski0afbb632008-06-06 21:10:57 +0000686 if recover_hosts:
687 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000688
jamesrenc44ae992010-02-19 00:12:54 +0000689 self._host_scheduler.recovery_on_startup()
690
mbligh36768f02008-02-22 18:28:33 +0000691
jadmanski0afbb632008-06-06 21:10:57 +0000692 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000693 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000694 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000695 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000696 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000697 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000698 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000699 self._schedule_running_host_queue_entries()
700 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000701 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000702 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000703 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000704 _drone_manager.execute_actions()
705 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000706 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000707 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000708
showard97aed502008-11-04 02:01:24 +0000709
mblighf3294cc2009-04-08 21:17:38 +0000710 def _run_cleanup(self):
711 self._periodic_cleanup.run_cleanup_maybe()
712 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000713
mbligh36768f02008-02-22 18:28:33 +0000714
showardf13a9e22009-12-18 22:54:09 +0000715 def _garbage_collection(self):
716 threshold_time = time.time() - self._seconds_between_garbage_stats
717 if threshold_time < self._last_garbage_stats_time:
718 # Don't generate these reports very often.
719 return
720
721 self._last_garbage_stats_time = time.time()
722 # Force a full level 0 collection (because we can, it doesn't hurt
723 # at this interval).
724 gc.collect()
725 logging.info('Logging garbage collector stats on tick %d.',
726 self._tick_count)
727 gc_stats._log_garbage_collector_stats()
728
729
showard170873e2009-01-07 00:22:26 +0000730 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
731 for object_id in object_ids:
732 agent_dict.setdefault(object_id, set()).add(agent)
733
734
735 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
736 for object_id in object_ids:
737 assert object_id in agent_dict
738 agent_dict[object_id].remove(agent)
739
740
showardd1195652009-12-08 22:21:02 +0000741 def add_agent_task(self, agent_task):
742 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000743 self._agents.append(agent)
744 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000745 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
746 self._register_agent_for_ids(self._queue_entry_agents,
747 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000748
showard170873e2009-01-07 00:22:26 +0000749
750 def get_agents_for_entry(self, queue_entry):
751 """
752 Find agents corresponding to the specified queue_entry.
753 """
showardd3dc1992009-04-22 21:01:40 +0000754 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000755
756
757 def host_has_agent(self, host):
758 """
759 Determine if there is currently an Agent present using this host.
760 """
761 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000762
763
jadmanski0afbb632008-06-06 21:10:57 +0000764 def remove_agent(self, agent):
765 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000766 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
767 agent)
768 self._unregister_agent_for_ids(self._queue_entry_agents,
769 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000770
771
showard8cc058f2009-09-08 16:26:33 +0000772 def _host_has_scheduled_special_task(self, host):
773 return bool(models.SpecialTask.objects.filter(host__id=host.id,
774 is_active=False,
775 is_complete=False))
776
777
jadmanski0afbb632008-06-06 21:10:57 +0000778 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000779 agent_tasks = self._create_recovery_agent_tasks()
780 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000781 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000782 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000783 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000784 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000785 self._reverify_remaining_hosts()
786 # reinitialize drones after killing orphaned processes, since they can
787 # leave around files when they die
788 _drone_manager.execute_actions()
789 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000790
showard170873e2009-01-07 00:22:26 +0000791
showardd1195652009-12-08 22:21:02 +0000792 def _create_recovery_agent_tasks(self):
793 return (self._get_queue_entry_agent_tasks()
794 + self._get_special_task_agent_tasks(is_active=True))
795
796
797 def _get_queue_entry_agent_tasks(self):
798 # host queue entry statuses handled directly by AgentTasks (Verifying is
799 # handled through SpecialTasks, so is not listed here)
800 statuses = (models.HostQueueEntry.Status.STARTING,
801 models.HostQueueEntry.Status.RUNNING,
802 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000803 models.HostQueueEntry.Status.PARSING,
804 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000805 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000806 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000807 where='status IN (%s)' % status_list)
808
809 agent_tasks = []
810 used_queue_entries = set()
811 for entry in queue_entries:
812 if self.get_agents_for_entry(entry):
813 # already being handled
814 continue
815 if entry in used_queue_entries:
816 # already picked up by a synchronous job
817 continue
818 agent_task = self._get_agent_task_for_queue_entry(entry)
819 agent_tasks.append(agent_task)
820 used_queue_entries.update(agent_task.queue_entries)
821 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000822
823
showardd1195652009-12-08 22:21:02 +0000824 def _get_special_task_agent_tasks(self, is_active=False):
825 special_tasks = models.SpecialTask.objects.filter(
826 is_active=is_active, is_complete=False)
827 return [self._get_agent_task_for_special_task(task)
828 for task in special_tasks]
829
830
831 def _get_agent_task_for_queue_entry(self, queue_entry):
832 """
833 Construct an AgentTask instance for the given active HostQueueEntry,
834 if one can currently run it.
835 @param queue_entry: a HostQueueEntry
836 @returns an AgentTask to run the queue entry
837 """
838 task_entries = queue_entry.job.get_group_entries(queue_entry)
839 self._check_for_duplicate_host_entries(task_entries)
840
841 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
842 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000843 if queue_entry.is_hostless():
844 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000845 return QueueTask(queue_entries=task_entries)
846 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
847 return GatherLogsTask(queue_entries=task_entries)
848 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
849 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000850 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
851 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000852
853 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000854 'invalid status %s: %s'
855 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000856
857
858 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000859 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
860 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000861 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000862 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000863 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000864 if using_host:
showardd1195652009-12-08 22:21:02 +0000865 self._assert_host_has_no_agent(task_entry)
866
867
868 def _assert_host_has_no_agent(self, entry):
869 """
870 @param entry: a HostQueueEntry or a SpecialTask
871 """
872 if self.host_has_agent(entry.host):
873 agent = tuple(self._host_agents.get(entry.host.id))[0]
874 raise SchedulerError(
875 'While scheduling %s, host %s already has a host agent %s'
876 % (entry, entry.host, agent.task))
877
878
879 def _get_agent_task_for_special_task(self, special_task):
880 """
881 Construct an AgentTask class to run the given SpecialTask and add it
882 to this dispatcher.
883 @param special_task: a models.SpecialTask instance
884 @returns an AgentTask to run this SpecialTask
885 """
886 self._assert_host_has_no_agent(special_task)
887
888 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
889 for agent_task_class in special_agent_task_classes:
890 if agent_task_class.TASK_TYPE == special_task.task:
891 return agent_task_class(task=special_task)
892
893 raise SchedulerError('No AgentTask class for task', str(special_task))
894
895
896 def _register_pidfiles(self, agent_tasks):
897 for agent_task in agent_tasks:
898 agent_task.register_necessary_pidfiles()
899
900
901 def _recover_tasks(self, agent_tasks):
902 orphans = _drone_manager.get_orphaned_autoserv_processes()
903
904 for agent_task in agent_tasks:
905 agent_task.recover()
906 if agent_task.monitor and agent_task.monitor.has_process():
907 orphans.discard(agent_task.monitor.get_process())
908 self.add_agent_task(agent_task)
909
910 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000911
912
showard8cc058f2009-09-08 16:26:33 +0000913 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000914 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
915 % status):
showard0db3d432009-10-12 20:29:15 +0000916 if entry.status == status and not self.get_agents_for_entry(entry):
917 # The status can change during iteration, e.g., if job.run()
918 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000919 yield entry
920
921
showard6878e8b2009-07-20 22:37:45 +0000922 def _check_for_remaining_orphan_processes(self, orphans):
923 if not orphans:
924 return
925 subject = 'Unrecovered orphan autoserv processes remain'
926 message = '\n'.join(str(process) for process in orphans)
927 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000928
929 die_on_orphans = global_config.global_config.get_config_value(
930 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
931
932 if die_on_orphans:
933 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000934
showard170873e2009-01-07 00:22:26 +0000935
showard8cc058f2009-09-08 16:26:33 +0000936 def _recover_pending_entries(self):
937 for entry in self._get_unassigned_entries(
938 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000939 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000940 entry.on_pending()
941
942
showardb8900452009-10-12 20:31:01 +0000943 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000944 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000945 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
946 unrecovered_hqes = []
947 for queue_entry in queue_entries:
948 special_tasks = models.SpecialTask.objects.filter(
949 task__in=(models.SpecialTask.Task.CLEANUP,
950 models.SpecialTask.Task.VERIFY),
951 queue_entry__id=queue_entry.id,
952 is_complete=False)
953 if special_tasks.count() == 0:
954 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000955
showardb8900452009-10-12 20:31:01 +0000956 if unrecovered_hqes:
957 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000958 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000959 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000960 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000961
962
showard65db3932009-10-28 19:54:35 +0000963 def _get_prioritized_special_tasks(self):
964 """
965 Returns all queued SpecialTasks prioritized for repair first, then
966 cleanup, then verify.
967 """
968 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
969 is_complete=False,
970 host__locked=False)
971 # exclude hosts with active queue entries unless the SpecialTask is for
972 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000973 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000974 queued_tasks, 'afe_host_queue_entries', 'host_id',
975 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000976 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000977 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000978 where=['(afe_host_queue_entries.id IS NULL OR '
979 'afe_host_queue_entries.id = '
980 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000981
showard65db3932009-10-28 19:54:35 +0000982 # reorder tasks by priority
983 task_priority_order = [models.SpecialTask.Task.REPAIR,
984 models.SpecialTask.Task.CLEANUP,
985 models.SpecialTask.Task.VERIFY]
986 def task_priority_key(task):
987 return task_priority_order.index(task.task)
988 return sorted(queued_tasks, key=task_priority_key)
989
990
showard65db3932009-10-28 19:54:35 +0000991 def _schedule_special_tasks(self):
992 """
993 Execute queued SpecialTasks that are ready to run on idle hosts.
994 """
995 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000996 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000997 continue
showardd1195652009-12-08 22:21:02 +0000998 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000999
1000
showard170873e2009-01-07 00:22:26 +00001001 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +00001002 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001003 # should never happen
showarded2afea2009-07-07 20:54:07 +00001004 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001005 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001006 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001007 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001008 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001009
1010
jadmanski0afbb632008-06-06 21:10:57 +00001011 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001012 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001013 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001014 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001015 if self.host_has_agent(host):
1016 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001017 continue
showard8cc058f2009-09-08 16:26:33 +00001018 if self._host_has_scheduled_special_task(host):
1019 # host will have a special task scheduled on the next cycle
1020 continue
showard170873e2009-01-07 00:22:26 +00001021 if print_message:
showardb18134f2009-03-20 20:52:18 +00001022 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001023 models.SpecialTask.objects.create(
1024 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001025 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001026
1027
jadmanski0afbb632008-06-06 21:10:57 +00001028 def _recover_hosts(self):
1029 # recover "Repair Failed" hosts
1030 message = 'Reverifying dead host %s'
1031 self._reverify_hosts_where("status = 'Repair Failed'",
1032 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001033
1034
showard04c82c52008-05-29 19:38:12 +00001035
showardb95b1bd2008-08-15 18:11:04 +00001036 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001037 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001038 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001039 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001040 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001041 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001042
1043
showard89f84db2009-03-12 20:39:13 +00001044 def _refresh_pending_queue_entries(self):
1045 """
1046 Lookup the pending HostQueueEntries and call our HostScheduler
1047 refresh() method given that list. Return the list.
1048
1049 @returns A list of pending HostQueueEntries sorted in priority order.
1050 """
showard63a34772008-08-18 19:32:50 +00001051 queue_entries = self._get_pending_queue_entries()
1052 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001053 return []
showardb95b1bd2008-08-15 18:11:04 +00001054
showard63a34772008-08-18 19:32:50 +00001055 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001056
showard89f84db2009-03-12 20:39:13 +00001057 return queue_entries
1058
1059
1060 def _schedule_atomic_group(self, queue_entry):
1061 """
1062 Schedule the given queue_entry on an atomic group of hosts.
1063
1064 Returns immediately if there are insufficient available hosts.
1065
1066 Creates new HostQueueEntries based off of queue_entry for the
1067 scheduled hosts and starts them all running.
1068 """
1069 # This is a virtual host queue entry representing an entire
1070 # atomic group, find a group and schedule their hosts.
1071 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1072 queue_entry)
1073 if not group_hosts:
1074 return
showardcbe6f942009-06-17 19:33:49 +00001075
1076 logging.info('Expanding atomic group entry %s with hosts %s',
1077 queue_entry,
1078 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001079
showard89f84db2009-03-12 20:39:13 +00001080 for assigned_host in group_hosts[1:]:
1081 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001082 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001083 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001084 new_hqe.set_host(assigned_host)
1085 self._run_queue_entry(new_hqe)
1086
1087 # The first assigned host uses the original HostQueueEntry
1088 queue_entry.set_host(group_hosts[0])
1089 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001090
1091
showarda9545c02009-12-18 22:44:26 +00001092 def _schedule_hostless_job(self, queue_entry):
1093 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001094 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001095
1096
showard89f84db2009-03-12 20:39:13 +00001097 def _schedule_new_jobs(self):
1098 queue_entries = self._refresh_pending_queue_entries()
1099 if not queue_entries:
1100 return
1101
showard63a34772008-08-18 19:32:50 +00001102 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001103 is_unassigned_atomic_group = (
1104 queue_entry.atomic_group_id is not None
1105 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001106
1107 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001108 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001109 elif is_unassigned_atomic_group:
1110 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001111 else:
jamesren883492a2010-02-12 00:45:18 +00001112 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001113 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001114 assert assigned_host.id == queue_entry.host_id
1115 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001116
1117
showard8cc058f2009-09-08 16:26:33 +00001118 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001119 for agent_task in self._get_queue_entry_agent_tasks():
1120 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001121
1122
1123 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001124 for entry in scheduler_models.HostQueueEntry.fetch(
1125 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001126 task = entry.job.schedule_delayed_callback_task(entry)
1127 if task:
showardd1195652009-12-08 22:21:02 +00001128 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001129
1130
jamesren883492a2010-02-12 00:45:18 +00001131 def _run_queue_entry(self, queue_entry):
1132 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001133
1134
jadmanski0afbb632008-06-06 21:10:57 +00001135 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +00001136 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +00001137 for entry in scheduler_models.HostQueueEntry.fetch(
1138 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001139 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001140 for agent in self.get_agents_for_entry(entry):
1141 agent.abort()
1142 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +00001143 jobs_to_stop.add(entry.job)
1144 for job in jobs_to_stop:
1145 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +00001146
1147
showard324bf812009-01-20 23:23:38 +00001148 def _can_start_agent(self, agent, num_started_this_cycle,
1149 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001150 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001151 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001152 return True
1153 # don't allow any nonzero-process agents to run after we've reached a
1154 # limit (this avoids starvation of many-process agents)
1155 if have_reached_limit:
1156 return False
1157 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001158 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001159 agent.task.owner_username,
1160 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001161 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001162 return False
1163 # if a single agent exceeds the per-cycle throttling, still allow it to
1164 # run when it's the first agent in the cycle
1165 if num_started_this_cycle == 0:
1166 return True
1167 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001168 if (num_started_this_cycle + agent.task.num_processes >
1169 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001170 return False
1171 return True
1172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001175 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001176 have_reached_limit = False
1177 # iterate over copy, so we can remove agents during iteration
1178 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001179 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001180 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001181 have_reached_limit):
1182 have_reached_limit = True
1183 continue
showardd1195652009-12-08 22:21:02 +00001184 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001185 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001186 if agent.is_done():
1187 logging.info("agent finished")
1188 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001189 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001190 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001191
1192
showard29f7cd22009-04-29 21:16:24 +00001193 def _process_recurring_runs(self):
1194 recurring_runs = models.RecurringRun.objects.filter(
1195 start_date__lte=datetime.datetime.now())
1196 for rrun in recurring_runs:
1197 # Create job from template
1198 job = rrun.job
1199 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001200 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001201
1202 host_objects = info['hosts']
1203 one_time_hosts = info['one_time_hosts']
1204 metahost_objects = info['meta_hosts']
1205 dependencies = info['dependencies']
1206 atomic_group = info['atomic_group']
1207
1208 for host in one_time_hosts or []:
1209 this_host = models.Host.create_one_time_host(host.hostname)
1210 host_objects.append(this_host)
1211
1212 try:
1213 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001214 options=options,
showard29f7cd22009-04-29 21:16:24 +00001215 host_objects=host_objects,
1216 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001217 atomic_group=atomic_group)
1218
1219 except Exception, ex:
1220 logging.exception(ex)
1221 #TODO send email
1222
1223 if rrun.loop_count == 1:
1224 rrun.delete()
1225 else:
1226 if rrun.loop_count != 0: # if not infinite loop
1227 # calculate new start_date
1228 difference = datetime.timedelta(seconds=rrun.loop_period)
1229 rrun.start_date = rrun.start_date + difference
1230 rrun.loop_count -= 1
1231 rrun.save()
1232
1233
showard170873e2009-01-07 00:22:26 +00001234class PidfileRunMonitor(object):
1235 """
1236 Client must call either run() to start a new process or
1237 attach_to_existing_process().
1238 """
mbligh36768f02008-02-22 18:28:33 +00001239
showard170873e2009-01-07 00:22:26 +00001240 class _PidfileException(Exception):
1241 """
1242 Raised when there's some unexpected behavior with the pid file, but only
1243 used internally (never allowed to escape this class).
1244 """
mbligh36768f02008-02-22 18:28:33 +00001245
1246
showard170873e2009-01-07 00:22:26 +00001247 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001248 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001249 self._start_time = None
1250 self.pidfile_id = None
1251 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001252
1253
showard170873e2009-01-07 00:22:26 +00001254 def _add_nice_command(self, command, nice_level):
1255 if not nice_level:
1256 return command
1257 return ['nice', '-n', str(nice_level)] + command
1258
1259
1260 def _set_start_time(self):
1261 self._start_time = time.time()
1262
1263
showard418785b2009-11-23 20:19:59 +00001264 def run(self, command, working_directory, num_processes, nice_level=None,
1265 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001266 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001267 assert command is not None
1268 if nice_level is not None:
1269 command = ['nice', '-n', str(nice_level)] + command
1270 self._set_start_time()
1271 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001272 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001273 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001274 paired_with_pidfile=paired_with_pidfile, username=username,
1275 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001276
1277
showarded2afea2009-07-07 20:54:07 +00001278 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001279 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001280 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001281 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001282 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001283 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001284 if num_processes is not None:
1285 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001286
1287
jadmanski0afbb632008-06-06 21:10:57 +00001288 def kill(self):
showard170873e2009-01-07 00:22:26 +00001289 if self.has_process():
1290 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001291
mbligh36768f02008-02-22 18:28:33 +00001292
showard170873e2009-01-07 00:22:26 +00001293 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001294 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001295 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001296
1297
showard170873e2009-01-07 00:22:26 +00001298 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001299 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001300 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001301 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001302
1303
showard170873e2009-01-07 00:22:26 +00001304 def _read_pidfile(self, use_second_read=False):
1305 assert self.pidfile_id is not None, (
1306 'You must call run() or attach_to_existing_process()')
1307 contents = _drone_manager.get_pidfile_contents(
1308 self.pidfile_id, use_second_read=use_second_read)
1309 if contents.is_invalid():
1310 self._state = drone_manager.PidfileContents()
1311 raise self._PidfileException(contents)
1312 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001313
1314
showard21baa452008-10-21 00:08:39 +00001315 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001316 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1317 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001318 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001319 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001320
1321
1322 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001323 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001324 return
mblighbb421852008-03-11 22:36:16 +00001325
showard21baa452008-10-21 00:08:39 +00001326 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001327
showard170873e2009-01-07 00:22:26 +00001328 if self._state.process is None:
1329 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001330 return
mbligh90a549d2008-03-25 23:52:34 +00001331
showard21baa452008-10-21 00:08:39 +00001332 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001333 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001334 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001335 return
mbligh90a549d2008-03-25 23:52:34 +00001336
showard170873e2009-01-07 00:22:26 +00001337 # pid but no running process - maybe process *just* exited
1338 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001339 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001340 # autoserv exited without writing an exit code
1341 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001342 self._handle_pidfile_error(
1343 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001344
showard21baa452008-10-21 00:08:39 +00001345
1346 def _get_pidfile_info(self):
1347 """\
1348 After completion, self._state will contain:
1349 pid=None, exit_status=None if autoserv has not yet run
1350 pid!=None, exit_status=None if autoserv is running
1351 pid!=None, exit_status!=None if autoserv has completed
1352 """
1353 try:
1354 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001355 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001356 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001357
1358
showard170873e2009-01-07 00:22:26 +00001359 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001360 """\
1361 Called when no pidfile is found or no pid is in the pidfile.
1362 """
showard170873e2009-01-07 00:22:26 +00001363 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001364 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001365 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001366 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001367 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001368
1369
showard35162b02009-03-03 02:17:30 +00001370 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001371 """\
1372 Called when autoserv has exited without writing an exit status,
1373 or we've timed out waiting for autoserv to write a pid to the
1374 pidfile. In either case, we just return failure and the caller
1375 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001376
showard170873e2009-01-07 00:22:26 +00001377 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001378 """
1379 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001380 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001381 self._state.exit_status = 1
1382 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001383
1384
jadmanski0afbb632008-06-06 21:10:57 +00001385 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001386 self._get_pidfile_info()
1387 return self._state.exit_status
1388
1389
1390 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001391 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001392 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001393 if self._state.num_tests_failed is None:
1394 return -1
showard21baa452008-10-21 00:08:39 +00001395 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001396
1397
showardcdaeae82009-08-31 18:32:48 +00001398 def try_copy_results_on_drone(self, **kwargs):
1399 if self.has_process():
1400 # copy results logs into the normal place for job results
1401 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1402
1403
1404 def try_copy_to_results_repository(self, source, **kwargs):
1405 if self.has_process():
1406 _drone_manager.copy_to_results_repository(self.get_process(),
1407 source, **kwargs)
1408
1409
mbligh36768f02008-02-22 18:28:33 +00001410class Agent(object):
showard77182562009-06-10 00:16:05 +00001411 """
showard8cc058f2009-09-08 16:26:33 +00001412 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001413
1414 The following methods are required on all task objects:
1415 poll() - Called periodically to let the task check its status and
1416 update its internal state. If the task succeeded.
1417 is_done() - Returns True if the task is finished.
1418 abort() - Called when an abort has been requested. The task must
1419 set its aborted attribute to True if it actually aborted.
1420
1421 The following attributes are required on all task objects:
1422 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001423 success - bool, True if this task succeeded.
1424 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1425 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001426 """
1427
1428
showard418785b2009-11-23 20:19:59 +00001429 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001430 """
showard8cc058f2009-09-08 16:26:33 +00001431 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001432 """
showard8cc058f2009-09-08 16:26:33 +00001433 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001434
showard77182562009-06-10 00:16:05 +00001435 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001436 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001437
showard8cc058f2009-09-08 16:26:33 +00001438 self.queue_entry_ids = task.queue_entry_ids
1439 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001440
showard8cc058f2009-09-08 16:26:33 +00001441 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001442 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001443
1444
jadmanski0afbb632008-06-06 21:10:57 +00001445 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001446 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001447 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001448 self.task.poll()
1449 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001450 self.finished = True
showardec113162008-05-08 00:52:49 +00001451
1452
jadmanski0afbb632008-06-06 21:10:57 +00001453 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001454 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001455
1456
showardd3dc1992009-04-22 21:01:40 +00001457 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001458 if self.task:
1459 self.task.abort()
1460 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001461 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001462 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001463
showardd3dc1992009-04-22 21:01:40 +00001464
mbligh36768f02008-02-22 18:28:33 +00001465class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001466 class _NullMonitor(object):
1467 pidfile_id = None
1468
1469 def has_process(self):
1470 return True
1471
1472
1473 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001474 """
showardd1195652009-12-08 22:21:02 +00001475 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001476 """
jadmanski0afbb632008-06-06 21:10:57 +00001477 self.done = False
showardd1195652009-12-08 22:21:02 +00001478 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001479 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001480 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001481 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001482 self.queue_entry_ids = []
1483 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001484 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001485
1486
1487 def _set_ids(self, host=None, queue_entries=None):
1488 if queue_entries and queue_entries != [None]:
1489 self.host_ids = [entry.host.id for entry in queue_entries]
1490 self.queue_entry_ids = [entry.id for entry in queue_entries]
1491 else:
1492 assert host
1493 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001494
1495
jadmanski0afbb632008-06-06 21:10:57 +00001496 def poll(self):
showard08a36412009-05-05 01:01:13 +00001497 if not self.started:
1498 self.start()
showardd1195652009-12-08 22:21:02 +00001499 if not self.done:
1500 self.tick()
showard08a36412009-05-05 01:01:13 +00001501
1502
1503 def tick(self):
showardd1195652009-12-08 22:21:02 +00001504 assert self.monitor
1505 exit_code = self.monitor.exit_code()
1506 if exit_code is None:
1507 return
mbligh36768f02008-02-22 18:28:33 +00001508
showardd1195652009-12-08 22:21:02 +00001509 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001510 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def is_done(self):
1514 return self.done
mbligh36768f02008-02-22 18:28:33 +00001515
1516
jadmanski0afbb632008-06-06 21:10:57 +00001517 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001518 if self.done:
showardd1195652009-12-08 22:21:02 +00001519 assert self.started
showard08a36412009-05-05 01:01:13 +00001520 return
showardd1195652009-12-08 22:21:02 +00001521 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.done = True
1523 self.success = success
1524 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001528 """
1529 To be overridden.
1530 """
showarded2afea2009-07-07 20:54:07 +00001531 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001532 self.register_necessary_pidfiles()
1533
1534
1535 def _log_file(self):
1536 if not self._log_file_name:
1537 return None
1538 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001539
mbligh36768f02008-02-22 18:28:33 +00001540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001542 log_file = self._log_file()
1543 if self.monitor and log_file:
1544 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001545
1546
jadmanski0afbb632008-06-06 21:10:57 +00001547 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001548 """
1549 To be overridden.
1550 """
jadmanski0afbb632008-06-06 21:10:57 +00001551 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001552 logging.info("%s finished with success=%s", type(self).__name__,
1553 self.success)
1554
mbligh36768f02008-02-22 18:28:33 +00001555
1556
jadmanski0afbb632008-06-06 21:10:57 +00001557 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001558 if not self.started:
1559 self.prolog()
1560 self.run()
1561
1562 self.started = True
1563
1564
1565 def abort(self):
1566 if self.monitor:
1567 self.monitor.kill()
1568 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001569 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001570 self.cleanup()
1571
1572
showarded2afea2009-07-07 20:54:07 +00001573 def _get_consistent_execution_path(self, execution_entries):
1574 first_execution_path = execution_entries[0].execution_path()
1575 for execution_entry in execution_entries[1:]:
1576 assert execution_entry.execution_path() == first_execution_path, (
1577 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1578 execution_entry,
1579 first_execution_path,
1580 execution_entries[0]))
1581 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001582
1583
showarded2afea2009-07-07 20:54:07 +00001584 def _copy_results(self, execution_entries, use_monitor=None):
1585 """
1586 @param execution_entries: list of objects with execution_path() method
1587 """
showard6d1c1432009-08-20 23:30:39 +00001588 if use_monitor is not None and not use_monitor.has_process():
1589 return
1590
showarded2afea2009-07-07 20:54:07 +00001591 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001592 if use_monitor is None:
1593 assert self.monitor
1594 use_monitor = self.monitor
1595 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001596 execution_path = self._get_consistent_execution_path(execution_entries)
1597 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001598 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001599
showarda1e74b32009-05-12 17:32:04 +00001600
1601 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001602 for queue_entry in queue_entries:
1603 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001604
1605
mbligh4608b002010-01-05 18:22:35 +00001606 def _archive_results(self, queue_entries):
1607 for queue_entry in queue_entries:
1608 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001609
1610
showardd1195652009-12-08 22:21:02 +00001611 def _command_line(self):
1612 """
1613 Return the command line to run. Must be overridden.
1614 """
1615 raise NotImplementedError
1616
1617
1618 @property
1619 def num_processes(self):
1620 """
1621 Return the number of processes forked by this AgentTask's process. It
1622 may only be approximate. To be overridden if necessary.
1623 """
1624 return 1
1625
1626
1627 def _paired_with_monitor(self):
1628 """
1629 If this AgentTask's process must run on the same machine as some
1630 previous process, this method should be overridden to return a
1631 PidfileRunMonitor for that process.
1632 """
1633 return self._NullMonitor()
1634
1635
1636 @property
1637 def owner_username(self):
1638 """
1639 Return login of user responsible for this task. May be None. Must be
1640 overridden.
1641 """
1642 raise NotImplementedError
1643
1644
1645 def _working_directory(self):
1646 """
1647 Return the directory where this AgentTask's process executes. Must be
1648 overridden.
1649 """
1650 raise NotImplementedError
1651
1652
1653 def _pidfile_name(self):
1654 """
1655 Return the name of the pidfile this AgentTask's process uses. To be
1656 overridden if necessary.
1657 """
jamesrenc44ae992010-02-19 00:12:54 +00001658 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001659
1660
1661 def _check_paired_results_exist(self):
1662 if not self._paired_with_monitor().has_process():
1663 email_manager.manager.enqueue_notify_email(
1664 'No paired results in task',
1665 'No paired results in task %s at %s'
1666 % (self, self._paired_with_monitor().pidfile_id))
1667 self.finished(False)
1668 return False
1669 return True
1670
1671
1672 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001673 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001674 self.monitor = PidfileRunMonitor()
1675
1676
1677 def run(self):
1678 if not self._check_paired_results_exist():
1679 return
1680
1681 self._create_monitor()
1682 self.monitor.run(
1683 self._command_line(), self._working_directory(),
1684 num_processes=self.num_processes,
1685 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1686 pidfile_name=self._pidfile_name(),
1687 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001688 username=self.owner_username,
1689 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1690
1691
1692 def get_drone_hostnames_allowed(self):
1693 if not models.DroneSet.drone_sets_enabled():
1694 return None
1695
1696 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1697 if not hqes:
1698 # Only special tasks could be missing host queue entries
1699 assert isinstance(self, SpecialAgentTask)
1700 return self._user_or_global_default_drone_set(
1701 self.task, self.task.requested_by)
1702
1703 job_ids = hqes.values_list('job', flat=True).distinct()
1704 assert job_ids.count() == 1, ("AgentTask's queue entries "
1705 "span multiple jobs")
1706
1707 job = models.Job.objects.get(id=job_ids[0])
1708 drone_set = job.drone_set
1709 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001710 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001711
1712 return drone_set.get_drone_hostnames()
1713
1714
1715 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1716 """
1717 Returns the user's default drone set, if present.
1718
1719 Otherwise, returns the global default drone set.
1720 """
1721 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1722 if not user:
1723 logging.warn('%s had no owner; using default drone set',
1724 obj_with_owner)
1725 return default_hostnames
1726 if not user.drone_set:
1727 logging.warn('User %s has no default drone set, using global '
1728 'default', user.login)
1729 return default_hostnames
1730 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001731
1732
1733 def register_necessary_pidfiles(self):
1734 pidfile_id = _drone_manager.get_pidfile_id_from(
1735 self._working_directory(), self._pidfile_name())
1736 _drone_manager.register_pidfile(pidfile_id)
1737
1738 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1739 if paired_pidfile_id:
1740 _drone_manager.register_pidfile(paired_pidfile_id)
1741
1742
1743 def recover(self):
1744 if not self._check_paired_results_exist():
1745 return
1746
1747 self._create_monitor()
1748 self.monitor.attach_to_existing_process(
1749 self._working_directory(), pidfile_name=self._pidfile_name(),
1750 num_processes=self.num_processes)
1751 if not self.monitor.has_process():
1752 # no process to recover; wait to be started normally
1753 self.monitor = None
1754 return
1755
1756 self.started = True
1757 logging.info('Recovering process %s for %s at %s'
1758 % (self.monitor.get_process(), type(self).__name__,
1759 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001760
1761
mbligh4608b002010-01-05 18:22:35 +00001762 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1763 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001764 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001765 for entry in queue_entries:
1766 if entry.status not in allowed_hqe_statuses:
jamesrenb8f3f352010-06-10 00:44:06 +00001767 raise SchedulerError('%s attempting to start '
mbligh4608b002010-01-05 18:22:35 +00001768 'entry with invalid status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001769 % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001770 invalid_host_status = (
1771 allowed_host_statuses is not None
1772 and entry.host.status not in allowed_host_statuses)
1773 if invalid_host_status:
jamesrenb8f3f352010-06-10 00:44:06 +00001774 raise SchedulerError('%s attempting to start on queue '
mbligh4608b002010-01-05 18:22:35 +00001775 'entry with invalid host status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001776 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001777
1778
showardd9205182009-04-27 20:09:55 +00001779class TaskWithJobKeyvals(object):
1780 """AgentTask mixin providing functionality to help with job keyval files."""
1781 _KEYVAL_FILE = 'keyval'
1782 def _format_keyval(self, key, value):
1783 return '%s=%s' % (key, value)
1784
1785
1786 def _keyval_path(self):
1787 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001788 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001789
1790
1791 def _write_keyval_after_job(self, field, value):
1792 assert self.monitor
1793 if not self.monitor.has_process():
1794 return
1795 _drone_manager.write_lines_to_file(
1796 self._keyval_path(), [self._format_keyval(field, value)],
1797 paired_with_process=self.monitor.get_process())
1798
1799
1800 def _job_queued_keyval(self, job):
1801 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1802
1803
1804 def _write_job_finished(self):
1805 self._write_keyval_after_job("job_finished", int(time.time()))
1806
1807
showarddb502762009-09-09 15:31:20 +00001808 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1809 keyval_contents = '\n'.join(self._format_keyval(key, value)
1810 for key, value in keyval_dict.iteritems())
1811 # always end with a newline to allow additional keyvals to be written
1812 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001813 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001814 keyval_contents,
1815 file_path=keyval_path)
1816
1817
1818 def _write_keyvals_before_job(self, keyval_dict):
1819 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1820
1821
1822 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001823 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001824 host.hostname)
1825 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001826 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001827 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1828 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1829
1830
showard8cc058f2009-09-08 16:26:33 +00001831class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001832 """
1833 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1834 """
1835
1836 TASK_TYPE = None
1837 host = None
1838 queue_entry = None
1839
showardd1195652009-12-08 22:21:02 +00001840 def __init__(self, task, extra_command_args):
1841 super(SpecialAgentTask, self).__init__()
1842
lmrb7c5d272010-04-16 06:34:04 +00001843 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001844
jamesrenc44ae992010-02-19 00:12:54 +00001845 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001846 self.queue_entry = None
1847 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001848 self.queue_entry = scheduler_models.HostQueueEntry(
1849 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001850
showarded2afea2009-07-07 20:54:07 +00001851 self.task = task
1852 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001853
1854
showard8cc058f2009-09-08 16:26:33 +00001855 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001856 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1857
1858
1859 def _command_line(self):
1860 return _autoserv_command_line(self.host.hostname,
1861 self._extra_command_args,
1862 queue_entry=self.queue_entry)
1863
1864
1865 def _working_directory(self):
1866 return self.task.execution_path()
1867
1868
1869 @property
1870 def owner_username(self):
1871 if self.task.requested_by:
1872 return self.task.requested_by.login
1873 return None
showard8cc058f2009-09-08 16:26:33 +00001874
1875
showarded2afea2009-07-07 20:54:07 +00001876 def prolog(self):
1877 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001878 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001879 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001880
1881
showardde634ee2009-01-30 01:44:24 +00001882 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001883 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001884
showard2fe3f1d2009-07-06 20:19:11 +00001885 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001886 return # don't fail metahost entries, they'll be reassigned
1887
showard2fe3f1d2009-07-06 20:19:11 +00001888 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001889 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001890 return # entry has been aborted
1891
showard2fe3f1d2009-07-06 20:19:11 +00001892 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001893 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001894 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001895 self._write_keyval_after_job(queued_key, queued_time)
1896 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001897
showard8cc058f2009-09-08 16:26:33 +00001898 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001899 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001900 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001901 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001902
showard8cc058f2009-09-08 16:26:33 +00001903 pidfile_id = _drone_manager.get_pidfile_id_from(
1904 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001905 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001906 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001907
1908 if self.queue_entry.job.parse_failed_repair:
1909 self._parse_results([self.queue_entry])
1910 else:
1911 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001912
1913
1914 def cleanup(self):
1915 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001916
1917 # We will consider an aborted task to be "Failed"
1918 self.task.finish(bool(self.success))
1919
showardf85a0b72009-10-07 20:48:45 +00001920 if self.monitor:
1921 if self.monitor.has_process():
1922 self._copy_results([self.task])
1923 if self.monitor.pidfile_id is not None:
1924 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001925
1926
1927class RepairTask(SpecialAgentTask):
1928 TASK_TYPE = models.SpecialTask.Task.REPAIR
1929
1930
showardd1195652009-12-08 22:21:02 +00001931 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001932 """\
1933 queue_entry: queue entry to mark failed if this repair fails.
1934 """
1935 protection = host_protections.Protection.get_string(
1936 task.host.protection)
1937 # normalize the protection name
1938 protection = host_protections.Protection.get_attr_name(protection)
1939
1940 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001941 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001942
1943 # *don't* include the queue entry in IDs -- if the queue entry is
1944 # aborted, we want to leave the repair task running
1945 self._set_ids(host=self.host)
1946
1947
1948 def prolog(self):
1949 super(RepairTask, self).prolog()
1950 logging.info("repair_task starting")
1951 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001952
1953
jadmanski0afbb632008-06-06 21:10:57 +00001954 def epilog(self):
1955 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001956
jadmanski0afbb632008-06-06 21:10:57 +00001957 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001958 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001959 else:
showard8cc058f2009-09-08 16:26:33 +00001960 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001961 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001962 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001963
1964
showarded2afea2009-07-07 20:54:07 +00001965class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001966 def _copy_to_results_repository(self):
1967 if not self.queue_entry or self.queue_entry.meta_host:
1968 return
1969
1970 self.queue_entry.set_execution_subdir()
1971 log_name = os.path.basename(self.task.execution_path())
1972 source = os.path.join(self.task.execution_path(), 'debug',
1973 'autoserv.DEBUG')
1974 destination = os.path.join(
1975 self.queue_entry.execution_path(), log_name)
1976
1977 self.monitor.try_copy_to_results_repository(
1978 source, destination_path=destination)
1979
1980
showard170873e2009-01-07 00:22:26 +00001981 def epilog(self):
1982 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001983
showard775300b2009-09-09 15:30:50 +00001984 if self.success:
1985 return
showard8fe93b52008-11-18 17:53:22 +00001986
showard775300b2009-09-09 15:30:50 +00001987 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001988
showard775300b2009-09-09 15:30:50 +00001989 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001990 # effectively ignore failure for these hosts
1991 self.success = True
showard775300b2009-09-09 15:30:50 +00001992 return
1993
1994 if self.queue_entry:
1995 self.queue_entry.requeue()
1996
1997 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001998 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001999 queue_entry__id=self.queue_entry.id):
2000 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2001 self._fail_queue_entry()
2002 return
2003
showard9bb960b2009-11-19 01:02:11 +00002004 queue_entry = models.HostQueueEntry.objects.get(
2005 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002006 else:
2007 queue_entry = None
2008
2009 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002010 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002011 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002012 queue_entry=queue_entry,
2013 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002014
showard8fe93b52008-11-18 17:53:22 +00002015
2016class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002017 TASK_TYPE = models.SpecialTask.Task.VERIFY
2018
2019
showardd1195652009-12-08 22:21:02 +00002020 def __init__(self, task):
2021 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002022 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002023
2024
jadmanski0afbb632008-06-06 21:10:57 +00002025 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002026 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002027
showardb18134f2009-03-20 20:52:18 +00002028 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002029 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002030 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2031 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002032
jamesren42318f72010-05-10 23:40:59 +00002033 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002034 # and there's no need to keep records of other requests.
2035 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002036 host__id=self.host.id,
2037 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00002038 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00002039 queued_verifies = queued_verifies.exclude(id=self.task.id)
2040 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002041
mbligh36768f02008-02-22 18:28:33 +00002042
jadmanski0afbb632008-06-06 21:10:57 +00002043 def epilog(self):
2044 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002045 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002046 if self.queue_entry:
2047 self.queue_entry.on_pending()
2048 else:
2049 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002050
2051
mbligh4608b002010-01-05 18:22:35 +00002052class CleanupTask(PreJobTask):
2053 # note this can also run post-job, but when it does, it's running standalone
2054 # against the host (not related to the job), so it's not considered a
2055 # PostJobTask
2056
2057 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2058
2059
2060 def __init__(self, task, recover_run_monitor=None):
2061 super(CleanupTask, self).__init__(task, ['--cleanup'])
2062 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2063
2064
2065 def prolog(self):
2066 super(CleanupTask, self).prolog()
2067 logging.info("starting cleanup task for host: %s", self.host.hostname)
2068 self.host.set_status(models.Host.Status.CLEANING)
2069 if self.queue_entry:
2070 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2071
2072
2073 def _finish_epilog(self):
2074 if not self.queue_entry or not self.success:
2075 return
2076
2077 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2078 should_run_verify = (
2079 self.queue_entry.job.run_verify
2080 and self.host.protection != do_not_verify_protection)
2081 if should_run_verify:
2082 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2083 models.SpecialTask.objects.create(
2084 host=models.Host.objects.get(id=self.host.id),
2085 queue_entry=entry,
2086 task=models.SpecialTask.Task.VERIFY)
2087 else:
2088 self.queue_entry.on_pending()
2089
2090
2091 def epilog(self):
2092 super(CleanupTask, self).epilog()
2093
2094 if self.success:
2095 self.host.update_field('dirty', 0)
2096 self.host.set_status(models.Host.Status.READY)
2097
2098 self._finish_epilog()
2099
2100
showarda9545c02009-12-18 22:44:26 +00002101class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2102 """
2103 Common functionality for QueueTask and HostlessQueueTask
2104 """
2105 def __init__(self, queue_entries):
2106 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002107 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002108 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002109
2110
showard73ec0442009-02-07 02:05:20 +00002111 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002112 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002113
2114
jamesrenc44ae992010-02-19 00:12:54 +00002115 def _write_control_file(self, execution_path):
2116 control_path = _drone_manager.attach_file_to_execution(
2117 execution_path, self.job.control_file)
2118 return control_path
2119
2120
showardd1195652009-12-08 22:21:02 +00002121 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002122 execution_path = self.queue_entries[0].execution_path()
2123 control_path = self._write_control_file(execution_path)
2124 hostnames = ','.join(entry.host.hostname
2125 for entry in self.queue_entries
2126 if not entry.is_hostless())
2127
2128 execution_tag = self.queue_entries[0].execution_tag()
2129 params = _autoserv_command_line(
2130 hostnames,
2131 ['-P', execution_tag, '-n',
2132 _drone_manager.absolute_path(control_path)],
2133 job=self.job, verbose=False)
2134
2135 if not self.job.is_server_job():
2136 params.append('-c')
2137
2138 return params
showardd1195652009-12-08 22:21:02 +00002139
2140
2141 @property
2142 def num_processes(self):
2143 return len(self.queue_entries)
2144
2145
2146 @property
2147 def owner_username(self):
2148 return self.job.owner
2149
2150
2151 def _working_directory(self):
2152 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002153
2154
jadmanski0afbb632008-06-06 21:10:57 +00002155 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002156 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002157 keyval_dict = self.job.keyval_dict()
2158 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002159 group_name = self.queue_entries[0].get_group_name()
2160 if group_name:
2161 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002162 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002163 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002164 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002165 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002166
2167
showard35162b02009-03-03 02:17:30 +00002168 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002169 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002170 _drone_manager.write_lines_to_file(error_file_path,
2171 [_LOST_PROCESS_ERROR])
2172
2173
showardd3dc1992009-04-22 21:01:40 +00002174 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002175 if not self.monitor:
2176 return
2177
showardd9205182009-04-27 20:09:55 +00002178 self._write_job_finished()
2179
showard35162b02009-03-03 02:17:30 +00002180 if self.monitor.lost_process:
2181 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002182
jadmanskif7fa2cc2008-10-01 14:13:23 +00002183
showardcbd74612008-11-19 21:42:02 +00002184 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002185 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002186 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002187 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002188 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002189
2190
jadmanskif7fa2cc2008-10-01 14:13:23 +00002191 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002192 if not self.monitor or not self.monitor.has_process():
2193 return
2194
jadmanskif7fa2cc2008-10-01 14:13:23 +00002195 # build up sets of all the aborted_by and aborted_on values
2196 aborted_by, aborted_on = set(), set()
2197 for queue_entry in self.queue_entries:
2198 if queue_entry.aborted_by:
2199 aborted_by.add(queue_entry.aborted_by)
2200 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2201 aborted_on.add(t)
2202
2203 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002204 # TODO(showard): this conditional is now obsolete, we just need to leave
2205 # it in temporarily for backwards compatibility over upgrades. delete
2206 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002207 assert len(aborted_by) <= 1
2208 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002209 aborted_by_value = aborted_by.pop()
2210 aborted_on_value = max(aborted_on)
2211 else:
2212 aborted_by_value = 'autotest_system'
2213 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002214
showarda0382352009-02-11 23:36:43 +00002215 self._write_keyval_after_job("aborted_by", aborted_by_value)
2216 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002217
showardcbd74612008-11-19 21:42:02 +00002218 aborted_on_string = str(datetime.datetime.fromtimestamp(
2219 aborted_on_value))
2220 self._write_status_comment('Job aborted by %s on %s' %
2221 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002222
2223
jadmanski0afbb632008-06-06 21:10:57 +00002224 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002225 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002226 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002227 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002228
2229
jadmanski0afbb632008-06-06 21:10:57 +00002230 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002231 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002232 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002233
2234
2235class QueueTask(AbstractQueueTask):
2236 def __init__(self, queue_entries):
2237 super(QueueTask, self).__init__(queue_entries)
2238 self._set_ids(queue_entries=queue_entries)
2239
2240
2241 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002242 self._check_queue_entry_statuses(
2243 self.queue_entries,
2244 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2245 models.HostQueueEntry.Status.RUNNING),
2246 allowed_host_statuses=(models.Host.Status.PENDING,
2247 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002248
2249 super(QueueTask, self).prolog()
2250
2251 for queue_entry in self.queue_entries:
2252 self._write_host_keyvals(queue_entry.host)
2253 queue_entry.host.set_status(models.Host.Status.RUNNING)
2254 queue_entry.host.update_field('dirty', 1)
2255 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2256 # TODO(gps): Remove this if nothing needs it anymore.
2257 # A potential user is: tko/parser
2258 self.job.write_to_machines_file(self.queue_entries[0])
2259
2260
2261 def _finish_task(self):
2262 super(QueueTask, self)._finish_task()
2263
2264 for queue_entry in self.queue_entries:
2265 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002266 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002267
2268
mbligh4608b002010-01-05 18:22:35 +00002269class HostlessQueueTask(AbstractQueueTask):
2270 def __init__(self, queue_entry):
2271 super(HostlessQueueTask, self).__init__([queue_entry])
2272 self.queue_entry_ids = [queue_entry.id]
2273
2274
2275 def prolog(self):
2276 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2277 super(HostlessQueueTask, self).prolog()
2278
2279
mbligh4608b002010-01-05 18:22:35 +00002280 def _finish_task(self):
2281 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002282 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002283
2284
showardd3dc1992009-04-22 21:01:40 +00002285class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002286 def __init__(self, queue_entries, log_file_name):
2287 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002288
showardd1195652009-12-08 22:21:02 +00002289 self.queue_entries = queue_entries
2290
showardd3dc1992009-04-22 21:01:40 +00002291 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002292 self._autoserv_monitor.attach_to_existing_process(
2293 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002294
showardd1195652009-12-08 22:21:02 +00002295
2296 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002297 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002298 return 'true'
2299 return self._generate_command(
2300 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002301
2302
2303 def _generate_command(self, results_dir):
2304 raise NotImplementedError('Subclasses must override this')
2305
2306
showardd1195652009-12-08 22:21:02 +00002307 @property
2308 def owner_username(self):
2309 return self.queue_entries[0].job.owner
2310
2311
2312 def _working_directory(self):
2313 return self._get_consistent_execution_path(self.queue_entries)
2314
2315
2316 def _paired_with_monitor(self):
2317 return self._autoserv_monitor
2318
2319
showardd3dc1992009-04-22 21:01:40 +00002320 def _job_was_aborted(self):
2321 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002322 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002323 queue_entry.update_from_database()
2324 if was_aborted is None: # first queue entry
2325 was_aborted = bool(queue_entry.aborted)
2326 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002327 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2328 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002329 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002330 'Inconsistent abort state',
2331 'Queue entries have inconsistent abort state:\n' +
2332 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002333 # don't crash here, just assume true
2334 return True
2335 return was_aborted
2336
2337
showardd1195652009-12-08 22:21:02 +00002338 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002339 if self._job_was_aborted():
2340 return models.HostQueueEntry.Status.ABORTED
2341
2342 # we'll use a PidfileRunMonitor to read the autoserv exit status
2343 if self._autoserv_monitor.exit_code() == 0:
2344 return models.HostQueueEntry.Status.COMPLETED
2345 return models.HostQueueEntry.Status.FAILED
2346
2347
showardd3dc1992009-04-22 21:01:40 +00002348 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002349 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002350 queue_entry.set_status(status)
2351
2352
2353 def abort(self):
2354 # override AgentTask.abort() to avoid killing the process and ending
2355 # the task. post-job tasks continue when the job is aborted.
2356 pass
2357
2358
mbligh4608b002010-01-05 18:22:35 +00002359 def _pidfile_label(self):
2360 # '.autoserv_execute' -> 'autoserv'
2361 return self._pidfile_name()[1:-len('_execute')]
2362
2363
showard9bb960b2009-11-19 01:02:11 +00002364class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002365 """
2366 Task responsible for
2367 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2368 * copying logs to the results repository
2369 * spawning CleanupTasks for hosts, if necessary
2370 * spawning a FinalReparseTask for the job
2371 """
showardd1195652009-12-08 22:21:02 +00002372 def __init__(self, queue_entries, recover_run_monitor=None):
2373 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002374 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002375 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002376 self._set_ids(queue_entries=queue_entries)
2377
2378
2379 def _generate_command(self, results_dir):
2380 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002381 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002382 return [_autoserv_path , '-p',
2383 '--pidfile-label=%s' % self._pidfile_label(),
2384 '--use-existing-results', '--collect-crashinfo',
2385 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002386
2387
showardd1195652009-12-08 22:21:02 +00002388 @property
2389 def num_processes(self):
2390 return len(self.queue_entries)
2391
2392
2393 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002394 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002395
2396
showardd3dc1992009-04-22 21:01:40 +00002397 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002398 self._check_queue_entry_statuses(
2399 self.queue_entries,
2400 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2401 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002402
showardd3dc1992009-04-22 21:01:40 +00002403 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002404
2405
showardd3dc1992009-04-22 21:01:40 +00002406 def epilog(self):
2407 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002408 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002409 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002410
showard9bb960b2009-11-19 01:02:11 +00002411
2412 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002413 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002414 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002415 models.HostQueueEntry.Status.COMPLETED)
2416 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2417 else:
2418 final_success = False
2419 num_tests_failed = 0
2420
showard9bb960b2009-11-19 01:02:11 +00002421 reboot_after = self._job.reboot_after
2422 do_reboot = (
2423 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002424 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002425 or reboot_after == model_attributes.RebootAfter.ALWAYS
2426 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002427 and final_success and num_tests_failed == 0))
2428
showardd1195652009-12-08 22:21:02 +00002429 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002430 if do_reboot:
2431 # don't pass the queue entry to the CleanupTask. if the cleanup
2432 # fails, the job doesn't care -- it's over.
2433 models.SpecialTask.objects.create(
2434 host=models.Host.objects.get(id=queue_entry.host.id),
2435 task=models.SpecialTask.Task.CLEANUP,
2436 requested_by=self._job.owner_model())
2437 else:
2438 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002439
2440
showard0bbfc212009-04-29 21:06:13 +00002441 def run(self):
showard597bfd32009-05-08 18:22:50 +00002442 autoserv_exit_code = self._autoserv_monitor.exit_code()
2443 # only run if Autoserv exited due to some signal. if we have no exit
2444 # code, assume something bad (and signal-like) happened.
2445 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002446 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002447 else:
2448 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002449
2450
mbligh4608b002010-01-05 18:22:35 +00002451class SelfThrottledPostJobTask(PostJobTask):
2452 """
2453 Special AgentTask subclass that maintains its own global process limit.
2454 """
2455 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002456
2457
mbligh4608b002010-01-05 18:22:35 +00002458 @classmethod
2459 def _increment_running_processes(cls):
2460 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002461
mblighd5c95802008-03-05 00:33:46 +00002462
mbligh4608b002010-01-05 18:22:35 +00002463 @classmethod
2464 def _decrement_running_processes(cls):
2465 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002466
2467
mbligh4608b002010-01-05 18:22:35 +00002468 @classmethod
2469 def _max_processes(cls):
2470 raise NotImplementedError
2471
2472
2473 @classmethod
2474 def _can_run_new_process(cls):
2475 return cls._num_running_processes < cls._max_processes()
2476
2477
2478 def _process_started(self):
2479 return bool(self.monitor)
2480
2481
2482 def tick(self):
2483 # override tick to keep trying to start until the process count goes
2484 # down and we can, at which point we revert to default behavior
2485 if self._process_started():
2486 super(SelfThrottledPostJobTask, self).tick()
2487 else:
2488 self._try_starting_process()
2489
2490
2491 def run(self):
2492 # override run() to not actually run unless we can
2493 self._try_starting_process()
2494
2495
2496 def _try_starting_process(self):
2497 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002498 return
2499
mbligh4608b002010-01-05 18:22:35 +00002500 # actually run the command
2501 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002502 if self._process_started():
2503 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002504
mblighd5c95802008-03-05 00:33:46 +00002505
mbligh4608b002010-01-05 18:22:35 +00002506 def finished(self, success):
2507 super(SelfThrottledPostJobTask, self).finished(success)
2508 if self._process_started():
2509 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002510
showard21baa452008-10-21 00:08:39 +00002511
mbligh4608b002010-01-05 18:22:35 +00002512class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002513 def __init__(self, queue_entries):
2514 super(FinalReparseTask, self).__init__(queue_entries,
2515 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002516 # don't use _set_ids, since we don't want to set the host_ids
2517 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002518
2519
2520 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002521 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002522 results_dir]
2523
2524
2525 @property
2526 def num_processes(self):
2527 return 0 # don't include parser processes in accounting
2528
2529
2530 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002531 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002532
2533
showard97aed502008-11-04 02:01:24 +00002534 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002535 def _max_processes(cls):
2536 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002537
2538
2539 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002540 self._check_queue_entry_statuses(
2541 self.queue_entries,
2542 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002543
showard97aed502008-11-04 02:01:24 +00002544 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002545
2546
2547 def epilog(self):
2548 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002549 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002550
2551
mbligh4608b002010-01-05 18:22:35 +00002552class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002553 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2554
mbligh4608b002010-01-05 18:22:35 +00002555 def __init__(self, queue_entries):
2556 super(ArchiveResultsTask, self).__init__(queue_entries,
2557 log_file_name='.archiving.log')
2558 # don't use _set_ids, since we don't want to set the host_ids
2559 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002560
2561
mbligh4608b002010-01-05 18:22:35 +00002562 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002563 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002564
2565
mbligh4608b002010-01-05 18:22:35 +00002566 def _generate_command(self, results_dir):
2567 return [_autoserv_path , '-p',
2568 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002569 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002570 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2571 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002572
2573
mbligh4608b002010-01-05 18:22:35 +00002574 @classmethod
2575 def _max_processes(cls):
2576 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002577
2578
2579 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002580 self._check_queue_entry_statuses(
2581 self.queue_entries,
2582 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2583
2584 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002585
2586
mbligh4608b002010-01-05 18:22:35 +00002587 def epilog(self):
2588 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002589 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002590 failed_file = os.path.join(self._working_directory(),
2591 self._ARCHIVING_FAILED_FILE)
2592 paired_process = self._paired_with_monitor().get_process()
2593 _drone_manager.write_lines_to_file(
2594 failed_file, ['Archiving failed with exit code %s'
2595 % self.monitor.exit_code()],
2596 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002597 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002598
2599
mbligh36768f02008-02-22 18:28:33 +00002600if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002601 main()