blob: 5b7e593e4f0aa14362f747099849a2a236036996 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
80 raise SchedulerError('Drone sets are enabled, but no default is set')
81
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000102 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000103
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
120 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
121 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000122 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
jamesrenc7d387e2010-08-10 21:48:30 +0000138 # This is helpful for debugging why stuff a scheduler launches is
139 # misbehaving.
140 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 if options.test:
143 global _autoserv_path
144 _autoserv_path = 'autoserv_dummy'
145 global _testing_mode
146 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000147
jamesrenc44ae992010-02-19 00:12:54 +0000148 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000149 server.start()
150
jadmanski0afbb632008-06-06 21:10:57 +0000151 try:
jamesrenc44ae992010-02-19 00:12:54 +0000152 initialize()
showardc5afc462009-01-13 00:09:39 +0000153 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000154 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000155
jadmanski0afbb632008-06-06 21:10:57 +0000156 while not _shutdown:
157 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000158 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000159 except:
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.log_stacktrace(
161 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000162
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000164 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000165 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000166 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000167
168
showard136e6dc2009-06-10 19:38:49 +0000169def setup_logging():
170 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
171 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
172 logging_manager.configure_logging(
173 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
174 logfile_name=log_name)
175
176
mbligh36768f02008-02-22 18:28:33 +0000177def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000178 global _shutdown
179 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000180 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000181
182
jamesrenc44ae992010-02-19 00:12:54 +0000183def initialize():
showardb18134f2009-03-20 20:52:18 +0000184 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
185 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000186
showard8de37132009-08-31 18:33:08 +0000187 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000188 logging.critical("monitor_db already running, aborting!")
189 sys.exit(1)
190 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000191
showardb1e51872008-10-07 11:08:18 +0000192 if _testing_mode:
193 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000194 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000195
jadmanski0afbb632008-06-06 21:10:57 +0000196 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
197 global _db
showard170873e2009-01-07 00:22:26 +0000198 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000199 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000200
showardfa8629c2008-11-04 16:51:23 +0000201 # ensure Django connection is in autocommit
202 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000203 # bypass the readonly connection
204 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000205
showardb18134f2009-03-20 20:52:18 +0000206 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000207 signal.signal(signal.SIGINT, handle_sigint)
208
jamesrenc44ae992010-02-19 00:12:54 +0000209 initialize_globals()
210 scheduler_models.initialize()
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
jamesrenc44ae992010-02-19 00:12:54 +0000222def initialize_globals():
223 global _drone_manager
224 _drone_manager = drone_manager.instance()
225
226
showarded2afea2009-07-07 20:54:07 +0000227def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
228 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000234 @param extra_args - list - Additional arguments to pass to autoserv.
235 @param job - Job object - If supplied, -u owner and -l name parameters
236 will be added.
237 @param queue_entry - A HostQueueEntry object - If supplied and no Job
238 object was supplied, this will be used to lookup the Job object.
239 """
showarda9545c02009-12-18 22:44:26 +0000240 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000241 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000242 if machines:
243 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000244 if job or queue_entry:
245 if not job:
246 job = queue_entry.job
247 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000248 if verbose:
249 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000250 return autoserv_argv + extra_args
251
252
showard89f84db2009-03-12 20:39:13 +0000253class SchedulerError(Exception):
254 """Raised by HostScheduler when an inconsistent state occurs."""
255
256
jamesren883492a2010-02-12 00:45:18 +0000257class HostScheduler(metahost_scheduler.HostSchedulingUtility):
258 """Handles the logic for choosing when to run jobs and on which hosts.
259
260 This class makes several queries to the database on each tick, building up
261 some auxiliary data structures and using them to determine which hosts are
262 eligible to run which jobs, taking into account all the various factors that
263 affect that.
264
265 In the past this was done with one or two very large, complex database
266 queries. It has proven much simpler and faster to build these auxiliary
267 data structures and perform the logic in Python.
268 """
269 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000270 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
271
272 # load site-specific scheduler selected in global_config
273 site_schedulers_str = global_config.global_config.get_config_value(
274 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
275 default='')
276 site_schedulers = set(site_schedulers_str.split(','))
277 for scheduler in get_site_metahost_schedulers():
278 if type(scheduler).__name__ in site_schedulers:
279 # always prepend, so site schedulers take precedence
280 self._metahost_schedulers = (
281 [scheduler] + self._metahost_schedulers)
282 logging.info('Metahost schedulers: %s',
283 ', '.join(type(scheduler).__name__ for scheduler
284 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000285
286
showard63a34772008-08-18 19:32:50 +0000287 def _get_ready_hosts(self):
288 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000289 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000290 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
291 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000292 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000293 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000294 "AND NOT afe_hosts.locked "
295 "AND (afe_hosts.status IS NULL "
296 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000297 return dict((host.id, host) for host in hosts)
298
299
300 @staticmethod
301 def _get_sql_id_list(id_list):
302 return ','.join(str(item_id) for item_id in id_list)
303
304
305 @classmethod
showard989f25d2008-10-01 11:38:11 +0000306 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000307 if not id_list:
308 return {}
showard63a34772008-08-18 19:32:50 +0000309 query %= cls._get_sql_id_list(id_list)
310 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000311 return cls._process_many2many_dict(rows, flip)
312
313
314 @staticmethod
315 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000316 result = {}
317 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000318 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000319 if flip:
320 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000321 result.setdefault(left_id, set()).add(right_id)
322 return result
323
324
325 @classmethod
326 def _get_job_acl_groups(cls, job_ids):
327 query = """
showardeab66ce2009-12-23 00:03:56 +0000328 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
329 FROM afe_jobs
330 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
331 INNER JOIN afe_acl_groups_users ON
332 afe_acl_groups_users.user_id = afe_users.id
333 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000334 """
335 return cls._get_many2many_dict(query, job_ids)
336
337
338 @classmethod
339 def _get_job_ineligible_hosts(cls, job_ids):
340 query = """
341 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000342 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000343 WHERE job_id IN (%s)
344 """
345 return cls._get_many2many_dict(query, job_ids)
346
347
348 @classmethod
showard989f25d2008-10-01 11:38:11 +0000349 def _get_job_dependencies(cls, job_ids):
350 query = """
351 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000352 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000353 WHERE job_id IN (%s)
354 """
355 return cls._get_many2many_dict(query, job_ids)
356
357
358 @classmethod
showard63a34772008-08-18 19:32:50 +0000359 def _get_host_acls(cls, host_ids):
360 query = """
showardd9ac4452009-02-07 02:04:37 +0000361 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000362 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000363 WHERE host_id IN (%s)
364 """
365 return cls._get_many2many_dict(query, host_ids)
366
367
368 @classmethod
369 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000370 if not host_ids:
371 return {}, {}
showard63a34772008-08-18 19:32:50 +0000372 query = """
373 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000374 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000375 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000376 """ % cls._get_sql_id_list(host_ids)
377 rows = _db.execute(query)
378 labels_to_hosts = cls._process_many2many_dict(rows)
379 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
380 return labels_to_hosts, hosts_to_labels
381
382
383 @classmethod
384 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000385 return dict((label.id, label) for label
386 in scheduler_models.Label.fetch())
387
388
389 def recovery_on_startup(self):
390 for metahost_scheduler in self._metahost_schedulers:
391 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000392
393
394 def refresh(self, pending_queue_entries):
395 self._hosts_available = self._get_ready_hosts()
396
397 relevant_jobs = [queue_entry.job_id
398 for queue_entry in pending_queue_entries]
399 self._job_acls = self._get_job_acl_groups(relevant_jobs)
400 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000401 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000402
403 host_ids = self._hosts_available.keys()
404 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000405 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
406
407 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000408
jamesrene21bf412010-02-26 02:30:07 +0000409
410 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000411 for metahost_scheduler in self._metahost_schedulers:
412 metahost_scheduler.tick()
413
showard63a34772008-08-18 19:32:50 +0000414
jamesren883492a2010-02-12 00:45:18 +0000415 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000416 return set(self._label_hosts.get(label_id, ()))
417
418
419 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000420 self._label_hosts[label_id].remove(host_id)
421
422
423 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000424 return self._hosts_available.pop(host_id)
425
426
427 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000428 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
429
430
showard63a34772008-08-18 19:32:50 +0000431 def _is_acl_accessible(self, host_id, queue_entry):
432 job_acls = self._job_acls.get(queue_entry.job_id, set())
433 host_acls = self._host_acls.get(host_id, set())
434 return len(host_acls.intersection(job_acls)) > 0
435
436
showard989f25d2008-10-01 11:38:11 +0000437 def _check_job_dependencies(self, job_dependencies, host_labels):
438 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000439 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000440
441
442 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
443 queue_entry):
showardade14e22009-01-26 22:38:32 +0000444 if not queue_entry.meta_host:
445 # bypass only_if_needed labels when a specific host is selected
446 return True
447
showard989f25d2008-10-01 11:38:11 +0000448 for label_id in host_labels:
449 label = self._labels[label_id]
450 if not label.only_if_needed:
451 # we don't care about non-only_if_needed labels
452 continue
453 if queue_entry.meta_host == label_id:
454 # if the label was requested in a metahost it's OK
455 continue
456 if label_id not in job_dependencies:
457 return False
458 return True
459
460
showard89f84db2009-03-12 20:39:13 +0000461 def _check_atomic_group_labels(self, host_labels, queue_entry):
462 """
463 Determine if the given HostQueueEntry's atomic group settings are okay
464 to schedule on a host with the given labels.
465
showard6157c632009-07-06 20:19:31 +0000466 @param host_labels: A list of label ids that the host has.
467 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000468
469 @returns True if atomic group settings are okay, False otherwise.
470 """
showard6157c632009-07-06 20:19:31 +0000471 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000472 queue_entry.atomic_group_id)
473
474
showard6157c632009-07-06 20:19:31 +0000475 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000476 """
477 Return the atomic group label id for a host with the given set of
478 labels if any, or None otherwise. Raises an exception if more than
479 one atomic group are found in the set of labels.
480
showard6157c632009-07-06 20:19:31 +0000481 @param host_labels: A list of label ids that the host has.
482 @param queue_entry: The HostQueueEntry we're testing. Only used for
483 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000484
485 @returns The id of the atomic group found on a label in host_labels
486 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000487 """
showard6157c632009-07-06 20:19:31 +0000488 atomic_labels = [self._labels[label_id] for label_id in host_labels
489 if self._labels[label_id].atomic_group_id is not None]
490 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000491 if not atomic_ids:
492 return None
493 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000494 logging.error('More than one Atomic Group on HQE "%s" via: %r',
495 queue_entry, atomic_labels)
496 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000497
498
499 def _get_atomic_group_labels(self, atomic_group_id):
500 """
501 Lookup the label ids that an atomic_group is associated with.
502
503 @param atomic_group_id - The id of the AtomicGroup to look up.
504
505 @returns A generator yeilding Label ids for this atomic group.
506 """
507 return (id for id, label in self._labels.iteritems()
508 if label.atomic_group_id == atomic_group_id
509 and not label.invalid)
510
511
showard54c1ea92009-05-20 00:32:58 +0000512 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000513 """
514 @param group_hosts - A sequence of Host ids to test for usability
515 and eligibility against the Job associated with queue_entry.
516 @param queue_entry - The HostQueueEntry that these hosts are being
517 tested for eligibility against.
518
519 @returns A subset of group_hosts Host ids that are eligible for the
520 supplied queue_entry.
521 """
522 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000523 if self.is_host_usable(host_id)
524 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000525
526
jamesren883492a2010-02-12 00:45:18 +0000527 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000528 if self._is_host_invalid(host_id):
529 # if an invalid host is scheduled for a job, it's a one-time host
530 # and it therefore bypasses eligibility checks. note this can only
531 # happen for non-metahosts, because invalid hosts have their label
532 # relationships cleared.
533 return True
534
showard989f25d2008-10-01 11:38:11 +0000535 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
536 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000537
showard89f84db2009-03-12 20:39:13 +0000538 return (self._is_acl_accessible(host_id, queue_entry) and
539 self._check_job_dependencies(job_dependencies, host_labels) and
540 self._check_only_if_needed_labels(
541 job_dependencies, host_labels, queue_entry) and
542 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000543
544
showard2924b0a2009-06-18 23:16:15 +0000545 def _is_host_invalid(self, host_id):
546 host_object = self._hosts_available.get(host_id, None)
547 return host_object and host_object.invalid
548
549
showard63a34772008-08-18 19:32:50 +0000550 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000551 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000552 return None
553 return self._hosts_available.pop(queue_entry.host_id, None)
554
555
jamesren883492a2010-02-12 00:45:18 +0000556 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000557 if host_id not in self._hosts_available:
558 # host was already used during this scheduling cycle
559 return False
560 if self._hosts_available[host_id].invalid:
561 # Invalid hosts cannot be used for metahosts. They're included in
562 # the original query because they can be used by non-metahosts.
563 return False
564 return True
565
566
jamesren883492a2010-02-12 00:45:18 +0000567 def schedule_entry(self, queue_entry):
568 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000569 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000570
571 for scheduler in self._metahost_schedulers:
572 if scheduler.can_schedule_metahost(queue_entry):
573 scheduler.schedule_metahost(queue_entry, self)
574 return None
575
576 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000577
578
showard89f84db2009-03-12 20:39:13 +0000579 def find_eligible_atomic_group(self, queue_entry):
580 """
581 Given an atomic group host queue entry, locate an appropriate group
582 of hosts for the associated job to run on.
583
584 The caller is responsible for creating new HQEs for the additional
585 hosts returned in order to run the actual job on them.
586
587 @returns A list of Host instances in a ready state to satisfy this
588 atomic group scheduling. Hosts will all belong to the same
589 atomic group label as specified by the queue_entry.
590 An empty list will be returned if no suitable atomic
591 group could be found.
592
593 TODO(gps): what is responsible for kicking off any attempted repairs on
594 a group of hosts? not this function, but something needs to. We do
595 not communicate that reason for returning [] outside of here...
596 For now, we'll just be unschedulable if enough hosts within one group
597 enter Repair Failed state.
598 """
599 assert queue_entry.atomic_group_id is not None
600 job = queue_entry.job
601 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000602 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000603 if job.synch_count > atomic_group.max_number_of_machines:
604 # Such a Job and HostQueueEntry should never be possible to
605 # create using the frontend. Regardless, we can't process it.
606 # Abort it immediately and log an error on the scheduler.
607 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000608 logging.error(
609 'Error: job %d synch_count=%d > requested atomic_group %d '
610 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
611 job.id, job.synch_count, atomic_group.id,
612 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000613 return []
jamesren883492a2010-02-12 00:45:18 +0000614 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
615 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000616
617 # Look in each label associated with atomic_group until we find one with
618 # enough hosts to satisfy the job.
619 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000620 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000621 if queue_entry.meta_host is not None:
622 # If we have a metahost label, only allow its hosts.
623 group_hosts.intersection_update(hosts_in_label)
624 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000625 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000626 group_hosts, queue_entry)
627
628 # Job.synch_count is treated as "minimum synch count" when
629 # scheduling for an atomic group of hosts. The atomic group
630 # number of machines is the maximum to pick out of a single
631 # atomic group label for scheduling at one time.
632 min_hosts = job.synch_count
633 max_hosts = atomic_group.max_number_of_machines
634
showard54c1ea92009-05-20 00:32:58 +0000635 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000636 # Not enough eligible hosts in this atomic group label.
637 continue
638
showard54c1ea92009-05-20 00:32:58 +0000639 eligible_hosts_in_group = [self._hosts_available[id]
640 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000641 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000642 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000643
showard89f84db2009-03-12 20:39:13 +0000644 # Limit ourselves to scheduling the atomic group size.
645 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000646 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000647
648 # Remove the selected hosts from our cached internal state
649 # of available hosts in order to return the Host objects.
650 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000651 for host in eligible_hosts_in_group:
652 hosts_in_label.discard(host.id)
653 self._hosts_available.pop(host.id)
654 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000655 return host_list
656
657 return []
658
659
showard170873e2009-01-07 00:22:26 +0000660class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000661 def __init__(self):
662 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000663 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000664 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000665 user_cleanup_time = scheduler_config.config.clean_interval
666 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
667 _db, user_cleanup_time)
668 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000669 self._host_agents = {}
670 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000671 self._tick_count = 0
672 self._last_garbage_stats_time = time.time()
673 self._seconds_between_garbage_stats = 60 * (
674 global_config.global_config.get_config_value(
675 scheduler_config.CONFIG_SECTION,
676 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000677
mbligh36768f02008-02-22 18:28:33 +0000678
showard915958d2009-04-22 21:00:58 +0000679 def initialize(self, recover_hosts=True):
680 self._periodic_cleanup.initialize()
681 self._24hr_upkeep.initialize()
682
jadmanski0afbb632008-06-06 21:10:57 +0000683 # always recover processes
684 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000685
jadmanski0afbb632008-06-06 21:10:57 +0000686 if recover_hosts:
687 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000688
jamesrenc44ae992010-02-19 00:12:54 +0000689 self._host_scheduler.recovery_on_startup()
690
mbligh36768f02008-02-22 18:28:33 +0000691
jadmanski0afbb632008-06-06 21:10:57 +0000692 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000693 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000694 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000695 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000696 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000697 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000698 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000699 self._schedule_running_host_queue_entries()
700 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000701 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000702 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000703 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000704 _drone_manager.execute_actions()
705 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000706 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000707 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000708
showard97aed502008-11-04 02:01:24 +0000709
mblighf3294cc2009-04-08 21:17:38 +0000710 def _run_cleanup(self):
711 self._periodic_cleanup.run_cleanup_maybe()
712 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000713
mbligh36768f02008-02-22 18:28:33 +0000714
showardf13a9e22009-12-18 22:54:09 +0000715 def _garbage_collection(self):
716 threshold_time = time.time() - self._seconds_between_garbage_stats
717 if threshold_time < self._last_garbage_stats_time:
718 # Don't generate these reports very often.
719 return
720
721 self._last_garbage_stats_time = time.time()
722 # Force a full level 0 collection (because we can, it doesn't hurt
723 # at this interval).
724 gc.collect()
725 logging.info('Logging garbage collector stats on tick %d.',
726 self._tick_count)
727 gc_stats._log_garbage_collector_stats()
728
729
showard170873e2009-01-07 00:22:26 +0000730 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
731 for object_id in object_ids:
732 agent_dict.setdefault(object_id, set()).add(agent)
733
734
735 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
736 for object_id in object_ids:
737 assert object_id in agent_dict
738 agent_dict[object_id].remove(agent)
739
740
showardd1195652009-12-08 22:21:02 +0000741 def add_agent_task(self, agent_task):
742 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000743 self._agents.append(agent)
744 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000745 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
746 self._register_agent_for_ids(self._queue_entry_agents,
747 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000748
showard170873e2009-01-07 00:22:26 +0000749
750 def get_agents_for_entry(self, queue_entry):
751 """
752 Find agents corresponding to the specified queue_entry.
753 """
showardd3dc1992009-04-22 21:01:40 +0000754 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000755
756
757 def host_has_agent(self, host):
758 """
759 Determine if there is currently an Agent present using this host.
760 """
761 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000762
763
jadmanski0afbb632008-06-06 21:10:57 +0000764 def remove_agent(self, agent):
765 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000766 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
767 agent)
768 self._unregister_agent_for_ids(self._queue_entry_agents,
769 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000770
771
showard8cc058f2009-09-08 16:26:33 +0000772 def _host_has_scheduled_special_task(self, host):
773 return bool(models.SpecialTask.objects.filter(host__id=host.id,
774 is_active=False,
775 is_complete=False))
776
777
jadmanski0afbb632008-06-06 21:10:57 +0000778 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000779 agent_tasks = self._create_recovery_agent_tasks()
780 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000781 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000782 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000783 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000784 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000785 self._reverify_remaining_hosts()
786 # reinitialize drones after killing orphaned processes, since they can
787 # leave around files when they die
788 _drone_manager.execute_actions()
789 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000790
showard170873e2009-01-07 00:22:26 +0000791
showardd1195652009-12-08 22:21:02 +0000792 def _create_recovery_agent_tasks(self):
793 return (self._get_queue_entry_agent_tasks()
794 + self._get_special_task_agent_tasks(is_active=True))
795
796
797 def _get_queue_entry_agent_tasks(self):
798 # host queue entry statuses handled directly by AgentTasks (Verifying is
799 # handled through SpecialTasks, so is not listed here)
800 statuses = (models.HostQueueEntry.Status.STARTING,
801 models.HostQueueEntry.Status.RUNNING,
802 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000803 models.HostQueueEntry.Status.PARSING,
804 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000805 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000806 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000807 where='status IN (%s)' % status_list)
808
809 agent_tasks = []
810 used_queue_entries = set()
811 for entry in queue_entries:
812 if self.get_agents_for_entry(entry):
813 # already being handled
814 continue
815 if entry in used_queue_entries:
816 # already picked up by a synchronous job
817 continue
818 agent_task = self._get_agent_task_for_queue_entry(entry)
819 agent_tasks.append(agent_task)
820 used_queue_entries.update(agent_task.queue_entries)
821 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000822
823
showardd1195652009-12-08 22:21:02 +0000824 def _get_special_task_agent_tasks(self, is_active=False):
825 special_tasks = models.SpecialTask.objects.filter(
826 is_active=is_active, is_complete=False)
827 return [self._get_agent_task_for_special_task(task)
828 for task in special_tasks]
829
830
831 def _get_agent_task_for_queue_entry(self, queue_entry):
832 """
833 Construct an AgentTask instance for the given active HostQueueEntry,
834 if one can currently run it.
835 @param queue_entry: a HostQueueEntry
836 @returns an AgentTask to run the queue entry
837 """
838 task_entries = queue_entry.job.get_group_entries(queue_entry)
839 self._check_for_duplicate_host_entries(task_entries)
840
841 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
842 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000843 if queue_entry.is_hostless():
844 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000845 return QueueTask(queue_entries=task_entries)
846 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
847 return GatherLogsTask(queue_entries=task_entries)
848 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
849 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000850 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
851 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000852
853 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000854 'invalid status %s: %s'
855 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000856
857
858 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000859 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
860 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000861 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000862 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000863 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000864 if using_host:
showardd1195652009-12-08 22:21:02 +0000865 self._assert_host_has_no_agent(task_entry)
866
867
868 def _assert_host_has_no_agent(self, entry):
869 """
870 @param entry: a HostQueueEntry or a SpecialTask
871 """
872 if self.host_has_agent(entry.host):
873 agent = tuple(self._host_agents.get(entry.host.id))[0]
874 raise SchedulerError(
875 'While scheduling %s, host %s already has a host agent %s'
876 % (entry, entry.host, agent.task))
877
878
879 def _get_agent_task_for_special_task(self, special_task):
880 """
881 Construct an AgentTask class to run the given SpecialTask and add it
882 to this dispatcher.
883 @param special_task: a models.SpecialTask instance
884 @returns an AgentTask to run this SpecialTask
885 """
886 self._assert_host_has_no_agent(special_task)
887
888 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
889 for agent_task_class in special_agent_task_classes:
890 if agent_task_class.TASK_TYPE == special_task.task:
891 return agent_task_class(task=special_task)
892
893 raise SchedulerError('No AgentTask class for task', str(special_task))
894
895
896 def _register_pidfiles(self, agent_tasks):
897 for agent_task in agent_tasks:
898 agent_task.register_necessary_pidfiles()
899
900
901 def _recover_tasks(self, agent_tasks):
902 orphans = _drone_manager.get_orphaned_autoserv_processes()
903
904 for agent_task in agent_tasks:
905 agent_task.recover()
906 if agent_task.monitor and agent_task.monitor.has_process():
907 orphans.discard(agent_task.monitor.get_process())
908 self.add_agent_task(agent_task)
909
910 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000911
912
showard8cc058f2009-09-08 16:26:33 +0000913 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000914 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
915 % status):
showard0db3d432009-10-12 20:29:15 +0000916 if entry.status == status and not self.get_agents_for_entry(entry):
917 # The status can change during iteration, e.g., if job.run()
918 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000919 yield entry
920
921
showard6878e8b2009-07-20 22:37:45 +0000922 def _check_for_remaining_orphan_processes(self, orphans):
923 if not orphans:
924 return
925 subject = 'Unrecovered orphan autoserv processes remain'
926 message = '\n'.join(str(process) for process in orphans)
927 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000928
929 die_on_orphans = global_config.global_config.get_config_value(
930 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
931
932 if die_on_orphans:
933 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000934
showard170873e2009-01-07 00:22:26 +0000935
showard8cc058f2009-09-08 16:26:33 +0000936 def _recover_pending_entries(self):
937 for entry in self._get_unassigned_entries(
938 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000939 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000940 entry.on_pending()
941
942
showardb8900452009-10-12 20:31:01 +0000943 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000944 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000945 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
946 unrecovered_hqes = []
947 for queue_entry in queue_entries:
948 special_tasks = models.SpecialTask.objects.filter(
949 task__in=(models.SpecialTask.Task.CLEANUP,
950 models.SpecialTask.Task.VERIFY),
951 queue_entry__id=queue_entry.id,
952 is_complete=False)
953 if special_tasks.count() == 0:
954 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000955
showardb8900452009-10-12 20:31:01 +0000956 if unrecovered_hqes:
957 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000958 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000959 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000960 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000961
962
showard65db3932009-10-28 19:54:35 +0000963 def _get_prioritized_special_tasks(self):
964 """
965 Returns all queued SpecialTasks prioritized for repair first, then
966 cleanup, then verify.
967 """
968 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
969 is_complete=False,
970 host__locked=False)
971 # exclude hosts with active queue entries unless the SpecialTask is for
972 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000973 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000974 queued_tasks, 'afe_host_queue_entries', 'host_id',
975 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000976 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000977 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000978 where=['(afe_host_queue_entries.id IS NULL OR '
979 'afe_host_queue_entries.id = '
980 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000981
showard65db3932009-10-28 19:54:35 +0000982 # reorder tasks by priority
983 task_priority_order = [models.SpecialTask.Task.REPAIR,
984 models.SpecialTask.Task.CLEANUP,
985 models.SpecialTask.Task.VERIFY]
986 def task_priority_key(task):
987 return task_priority_order.index(task.task)
988 return sorted(queued_tasks, key=task_priority_key)
989
990
showard65db3932009-10-28 19:54:35 +0000991 def _schedule_special_tasks(self):
992 """
993 Execute queued SpecialTasks that are ready to run on idle hosts.
994 """
995 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000996 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000997 continue
showardd1195652009-12-08 22:21:02 +0000998 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000999
1000
showard170873e2009-01-07 00:22:26 +00001001 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +00001002 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001003 # should never happen
showarded2afea2009-07-07 20:54:07 +00001004 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001005 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001006 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001007 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001008 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001009
1010
jadmanski0afbb632008-06-06 21:10:57 +00001011 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001012 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001013 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001014 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001015 if self.host_has_agent(host):
1016 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001017 continue
showard8cc058f2009-09-08 16:26:33 +00001018 if self._host_has_scheduled_special_task(host):
1019 # host will have a special task scheduled on the next cycle
1020 continue
showard170873e2009-01-07 00:22:26 +00001021 if print_message:
showardb18134f2009-03-20 20:52:18 +00001022 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001023 models.SpecialTask.objects.create(
1024 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001025 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001026
1027
jadmanski0afbb632008-06-06 21:10:57 +00001028 def _recover_hosts(self):
1029 # recover "Repair Failed" hosts
1030 message = 'Reverifying dead host %s'
1031 self._reverify_hosts_where("status = 'Repair Failed'",
1032 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001033
1034
showard04c82c52008-05-29 19:38:12 +00001035
showardb95b1bd2008-08-15 18:11:04 +00001036 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001037 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001038 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001039 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001040 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001041 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001042
1043
showard89f84db2009-03-12 20:39:13 +00001044 def _refresh_pending_queue_entries(self):
1045 """
1046 Lookup the pending HostQueueEntries and call our HostScheduler
1047 refresh() method given that list. Return the list.
1048
1049 @returns A list of pending HostQueueEntries sorted in priority order.
1050 """
showard63a34772008-08-18 19:32:50 +00001051 queue_entries = self._get_pending_queue_entries()
1052 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001053 return []
showardb95b1bd2008-08-15 18:11:04 +00001054
showard63a34772008-08-18 19:32:50 +00001055 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001056
showard89f84db2009-03-12 20:39:13 +00001057 return queue_entries
1058
1059
1060 def _schedule_atomic_group(self, queue_entry):
1061 """
1062 Schedule the given queue_entry on an atomic group of hosts.
1063
1064 Returns immediately if there are insufficient available hosts.
1065
1066 Creates new HostQueueEntries based off of queue_entry for the
1067 scheduled hosts and starts them all running.
1068 """
1069 # This is a virtual host queue entry representing an entire
1070 # atomic group, find a group and schedule their hosts.
1071 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1072 queue_entry)
1073 if not group_hosts:
1074 return
showardcbe6f942009-06-17 19:33:49 +00001075
1076 logging.info('Expanding atomic group entry %s with hosts %s',
1077 queue_entry,
1078 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001079
showard89f84db2009-03-12 20:39:13 +00001080 for assigned_host in group_hosts[1:]:
1081 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001082 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001083 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001084 new_hqe.set_host(assigned_host)
1085 self._run_queue_entry(new_hqe)
1086
1087 # The first assigned host uses the original HostQueueEntry
1088 queue_entry.set_host(group_hosts[0])
1089 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001090
1091
showarda9545c02009-12-18 22:44:26 +00001092 def _schedule_hostless_job(self, queue_entry):
1093 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001094 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001095
1096
showard89f84db2009-03-12 20:39:13 +00001097 def _schedule_new_jobs(self):
1098 queue_entries = self._refresh_pending_queue_entries()
1099 if not queue_entries:
1100 return
1101
showard63a34772008-08-18 19:32:50 +00001102 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001103 is_unassigned_atomic_group = (
1104 queue_entry.atomic_group_id is not None
1105 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001106
1107 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001108 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001109 elif is_unassigned_atomic_group:
1110 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001111 else:
jamesren883492a2010-02-12 00:45:18 +00001112 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001113 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001114 assert assigned_host.id == queue_entry.host_id
1115 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001116
1117
showard8cc058f2009-09-08 16:26:33 +00001118 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001119 for agent_task in self._get_queue_entry_agent_tasks():
1120 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001121
1122
1123 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001124 for entry in scheduler_models.HostQueueEntry.fetch(
1125 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001126 task = entry.job.schedule_delayed_callback_task(entry)
1127 if task:
showardd1195652009-12-08 22:21:02 +00001128 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001129
1130
jamesren883492a2010-02-12 00:45:18 +00001131 def _run_queue_entry(self, queue_entry):
1132 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001133
1134
jadmanski0afbb632008-06-06 21:10:57 +00001135 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +00001136 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +00001137 for entry in scheduler_models.HostQueueEntry.fetch(
1138 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001139 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001140 for agent in self.get_agents_for_entry(entry):
1141 agent.abort()
1142 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +00001143 jobs_to_stop.add(entry.job)
1144 for job in jobs_to_stop:
1145 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +00001146
1147
showard324bf812009-01-20 23:23:38 +00001148 def _can_start_agent(self, agent, num_started_this_cycle,
1149 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001150 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001151 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001152 return True
1153 # don't allow any nonzero-process agents to run after we've reached a
1154 # limit (this avoids starvation of many-process agents)
1155 if have_reached_limit:
1156 return False
1157 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001158 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001159 agent.task.owner_username,
1160 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001161 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001162 return False
1163 # if a single agent exceeds the per-cycle throttling, still allow it to
1164 # run when it's the first agent in the cycle
1165 if num_started_this_cycle == 0:
1166 return True
1167 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001168 if (num_started_this_cycle + agent.task.num_processes >
1169 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001170 return False
1171 return True
1172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001175 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001176 have_reached_limit = False
1177 # iterate over copy, so we can remove agents during iteration
1178 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001179 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001180 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001181 have_reached_limit):
1182 have_reached_limit = True
1183 continue
showardd1195652009-12-08 22:21:02 +00001184 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001185 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001186 if agent.is_done():
1187 logging.info("agent finished")
1188 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001189 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001190 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001191
1192
showard29f7cd22009-04-29 21:16:24 +00001193 def _process_recurring_runs(self):
1194 recurring_runs = models.RecurringRun.objects.filter(
1195 start_date__lte=datetime.datetime.now())
1196 for rrun in recurring_runs:
1197 # Create job from template
1198 job = rrun.job
1199 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001200 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001201
1202 host_objects = info['hosts']
1203 one_time_hosts = info['one_time_hosts']
1204 metahost_objects = info['meta_hosts']
1205 dependencies = info['dependencies']
1206 atomic_group = info['atomic_group']
1207
1208 for host in one_time_hosts or []:
1209 this_host = models.Host.create_one_time_host(host.hostname)
1210 host_objects.append(this_host)
1211
1212 try:
1213 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001214 options=options,
showard29f7cd22009-04-29 21:16:24 +00001215 host_objects=host_objects,
1216 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001217 atomic_group=atomic_group)
1218
1219 except Exception, ex:
1220 logging.exception(ex)
1221 #TODO send email
1222
1223 if rrun.loop_count == 1:
1224 rrun.delete()
1225 else:
1226 if rrun.loop_count != 0: # if not infinite loop
1227 # calculate new start_date
1228 difference = datetime.timedelta(seconds=rrun.loop_period)
1229 rrun.start_date = rrun.start_date + difference
1230 rrun.loop_count -= 1
1231 rrun.save()
1232
1233
showard170873e2009-01-07 00:22:26 +00001234class PidfileRunMonitor(object):
1235 """
1236 Client must call either run() to start a new process or
1237 attach_to_existing_process().
1238 """
mbligh36768f02008-02-22 18:28:33 +00001239
showard170873e2009-01-07 00:22:26 +00001240 class _PidfileException(Exception):
1241 """
1242 Raised when there's some unexpected behavior with the pid file, but only
1243 used internally (never allowed to escape this class).
1244 """
mbligh36768f02008-02-22 18:28:33 +00001245
1246
showard170873e2009-01-07 00:22:26 +00001247 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001248 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001249 self._start_time = None
1250 self.pidfile_id = None
1251 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001252
1253
showard170873e2009-01-07 00:22:26 +00001254 def _add_nice_command(self, command, nice_level):
1255 if not nice_level:
1256 return command
1257 return ['nice', '-n', str(nice_level)] + command
1258
1259
1260 def _set_start_time(self):
1261 self._start_time = time.time()
1262
1263
showard418785b2009-11-23 20:19:59 +00001264 def run(self, command, working_directory, num_processes, nice_level=None,
1265 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001266 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001267 assert command is not None
1268 if nice_level is not None:
1269 command = ['nice', '-n', str(nice_level)] + command
1270 self._set_start_time()
1271 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001272 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001273 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001274 paired_with_pidfile=paired_with_pidfile, username=username,
1275 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001276
1277
showarded2afea2009-07-07 20:54:07 +00001278 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001279 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001280 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001281 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001282 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001283 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001284 if num_processes is not None:
1285 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001286
1287
jadmanski0afbb632008-06-06 21:10:57 +00001288 def kill(self):
showard170873e2009-01-07 00:22:26 +00001289 if self.has_process():
1290 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001291
mbligh36768f02008-02-22 18:28:33 +00001292
showard170873e2009-01-07 00:22:26 +00001293 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001294 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001295 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001296
1297
showard170873e2009-01-07 00:22:26 +00001298 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001299 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001300 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001301 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001302
1303
showard170873e2009-01-07 00:22:26 +00001304 def _read_pidfile(self, use_second_read=False):
1305 assert self.pidfile_id is not None, (
1306 'You must call run() or attach_to_existing_process()')
1307 contents = _drone_manager.get_pidfile_contents(
1308 self.pidfile_id, use_second_read=use_second_read)
1309 if contents.is_invalid():
1310 self._state = drone_manager.PidfileContents()
1311 raise self._PidfileException(contents)
1312 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001313
1314
showard21baa452008-10-21 00:08:39 +00001315 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001316 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1317 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001318 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001319 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001320
1321
1322 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001323 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001324 return
mblighbb421852008-03-11 22:36:16 +00001325
showard21baa452008-10-21 00:08:39 +00001326 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001327
showard170873e2009-01-07 00:22:26 +00001328 if self._state.process is None:
1329 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001330 return
mbligh90a549d2008-03-25 23:52:34 +00001331
showard21baa452008-10-21 00:08:39 +00001332 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001333 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001334 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001335 return
mbligh90a549d2008-03-25 23:52:34 +00001336
showard170873e2009-01-07 00:22:26 +00001337 # pid but no running process - maybe process *just* exited
1338 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001339 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001340 # autoserv exited without writing an exit code
1341 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001342 self._handle_pidfile_error(
1343 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001344
showard21baa452008-10-21 00:08:39 +00001345
1346 def _get_pidfile_info(self):
1347 """\
1348 After completion, self._state will contain:
1349 pid=None, exit_status=None if autoserv has not yet run
1350 pid!=None, exit_status=None if autoserv is running
1351 pid!=None, exit_status!=None if autoserv has completed
1352 """
1353 try:
1354 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001355 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001356 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001357
1358
showard170873e2009-01-07 00:22:26 +00001359 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001360 """\
1361 Called when no pidfile is found or no pid is in the pidfile.
1362 """
showard170873e2009-01-07 00:22:26 +00001363 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001364 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001365 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001366 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001367 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001368
1369
showard35162b02009-03-03 02:17:30 +00001370 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001371 """\
1372 Called when autoserv has exited without writing an exit status,
1373 or we've timed out waiting for autoserv to write a pid to the
1374 pidfile. In either case, we just return failure and the caller
1375 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001376
showard170873e2009-01-07 00:22:26 +00001377 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001378 """
1379 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001380 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001381 self._state.exit_status = 1
1382 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001383
1384
jadmanski0afbb632008-06-06 21:10:57 +00001385 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001386 self._get_pidfile_info()
1387 return self._state.exit_status
1388
1389
1390 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001391 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001392 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001393 if self._state.num_tests_failed is None:
1394 return -1
showard21baa452008-10-21 00:08:39 +00001395 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001396
1397
showardcdaeae82009-08-31 18:32:48 +00001398 def try_copy_results_on_drone(self, **kwargs):
1399 if self.has_process():
1400 # copy results logs into the normal place for job results
1401 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1402
1403
1404 def try_copy_to_results_repository(self, source, **kwargs):
1405 if self.has_process():
1406 _drone_manager.copy_to_results_repository(self.get_process(),
1407 source, **kwargs)
1408
1409
mbligh36768f02008-02-22 18:28:33 +00001410class Agent(object):
showard77182562009-06-10 00:16:05 +00001411 """
showard8cc058f2009-09-08 16:26:33 +00001412 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001413
1414 The following methods are required on all task objects:
1415 poll() - Called periodically to let the task check its status and
1416 update its internal state. If the task succeeded.
1417 is_done() - Returns True if the task is finished.
1418 abort() - Called when an abort has been requested. The task must
1419 set its aborted attribute to True if it actually aborted.
1420
1421 The following attributes are required on all task objects:
1422 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001423 success - bool, True if this task succeeded.
1424 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1425 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001426 """
1427
1428
showard418785b2009-11-23 20:19:59 +00001429 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001430 """
showard8cc058f2009-09-08 16:26:33 +00001431 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001432 """
showard8cc058f2009-09-08 16:26:33 +00001433 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001434
showard77182562009-06-10 00:16:05 +00001435 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001436 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001437
showard8cc058f2009-09-08 16:26:33 +00001438 self.queue_entry_ids = task.queue_entry_ids
1439 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001440
showard8cc058f2009-09-08 16:26:33 +00001441 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001442 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001443
1444
jadmanski0afbb632008-06-06 21:10:57 +00001445 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001446 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001447 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001448 self.task.poll()
1449 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001450 self.finished = True
showardec113162008-05-08 00:52:49 +00001451
1452
jadmanski0afbb632008-06-06 21:10:57 +00001453 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001454 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001455
1456
showardd3dc1992009-04-22 21:01:40 +00001457 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001458 if self.task:
1459 self.task.abort()
1460 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001461 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001462 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001463
showardd3dc1992009-04-22 21:01:40 +00001464
mbligh36768f02008-02-22 18:28:33 +00001465class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001466 class _NullMonitor(object):
1467 pidfile_id = None
1468
1469 def has_process(self):
1470 return True
1471
1472
1473 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001474 """
showardd1195652009-12-08 22:21:02 +00001475 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001476 """
jadmanski0afbb632008-06-06 21:10:57 +00001477 self.done = False
showardd1195652009-12-08 22:21:02 +00001478 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001479 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001480 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001481 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001482 self.queue_entry_ids = []
1483 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001484 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001485
1486
1487 def _set_ids(self, host=None, queue_entries=None):
1488 if queue_entries and queue_entries != [None]:
1489 self.host_ids = [entry.host.id for entry in queue_entries]
1490 self.queue_entry_ids = [entry.id for entry in queue_entries]
1491 else:
1492 assert host
1493 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001494
1495
jadmanski0afbb632008-06-06 21:10:57 +00001496 def poll(self):
showard08a36412009-05-05 01:01:13 +00001497 if not self.started:
1498 self.start()
showardd1195652009-12-08 22:21:02 +00001499 if not self.done:
1500 self.tick()
showard08a36412009-05-05 01:01:13 +00001501
1502
1503 def tick(self):
showardd1195652009-12-08 22:21:02 +00001504 assert self.monitor
1505 exit_code = self.monitor.exit_code()
1506 if exit_code is None:
1507 return
mbligh36768f02008-02-22 18:28:33 +00001508
showardd1195652009-12-08 22:21:02 +00001509 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001510 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def is_done(self):
1514 return self.done
mbligh36768f02008-02-22 18:28:33 +00001515
1516
jadmanski0afbb632008-06-06 21:10:57 +00001517 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001518 if self.done:
showardd1195652009-12-08 22:21:02 +00001519 assert self.started
showard08a36412009-05-05 01:01:13 +00001520 return
showardd1195652009-12-08 22:21:02 +00001521 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001522 self.done = True
1523 self.success = success
1524 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001528 """
1529 To be overridden.
1530 """
showarded2afea2009-07-07 20:54:07 +00001531 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001532 self.register_necessary_pidfiles()
1533
1534
1535 def _log_file(self):
1536 if not self._log_file_name:
1537 return None
1538 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001539
mbligh36768f02008-02-22 18:28:33 +00001540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001542 log_file = self._log_file()
1543 if self.monitor and log_file:
1544 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001545
1546
jadmanski0afbb632008-06-06 21:10:57 +00001547 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001548 """
1549 To be overridden.
1550 """
jadmanski0afbb632008-06-06 21:10:57 +00001551 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001552 logging.info("%s finished with success=%s", type(self).__name__,
1553 self.success)
1554
mbligh36768f02008-02-22 18:28:33 +00001555
1556
jadmanski0afbb632008-06-06 21:10:57 +00001557 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001558 if not self.started:
1559 self.prolog()
1560 self.run()
1561
1562 self.started = True
1563
1564
1565 def abort(self):
1566 if self.monitor:
1567 self.monitor.kill()
1568 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001569 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001570 self.cleanup()
1571
1572
showarded2afea2009-07-07 20:54:07 +00001573 def _get_consistent_execution_path(self, execution_entries):
1574 first_execution_path = execution_entries[0].execution_path()
1575 for execution_entry in execution_entries[1:]:
1576 assert execution_entry.execution_path() == first_execution_path, (
1577 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1578 execution_entry,
1579 first_execution_path,
1580 execution_entries[0]))
1581 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001582
1583
showarded2afea2009-07-07 20:54:07 +00001584 def _copy_results(self, execution_entries, use_monitor=None):
1585 """
1586 @param execution_entries: list of objects with execution_path() method
1587 """
showard6d1c1432009-08-20 23:30:39 +00001588 if use_monitor is not None and not use_monitor.has_process():
1589 return
1590
showarded2afea2009-07-07 20:54:07 +00001591 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001592 if use_monitor is None:
1593 assert self.monitor
1594 use_monitor = self.monitor
1595 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001596 execution_path = self._get_consistent_execution_path(execution_entries)
1597 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001598 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001599
showarda1e74b32009-05-12 17:32:04 +00001600
1601 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001602 for queue_entry in queue_entries:
1603 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001604
1605
mbligh4608b002010-01-05 18:22:35 +00001606 def _archive_results(self, queue_entries):
1607 for queue_entry in queue_entries:
1608 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001609
1610
showardd1195652009-12-08 22:21:02 +00001611 def _command_line(self):
1612 """
1613 Return the command line to run. Must be overridden.
1614 """
1615 raise NotImplementedError
1616
1617
1618 @property
1619 def num_processes(self):
1620 """
1621 Return the number of processes forked by this AgentTask's process. It
1622 may only be approximate. To be overridden if necessary.
1623 """
1624 return 1
1625
1626
1627 def _paired_with_monitor(self):
1628 """
1629 If this AgentTask's process must run on the same machine as some
1630 previous process, this method should be overridden to return a
1631 PidfileRunMonitor for that process.
1632 """
1633 return self._NullMonitor()
1634
1635
1636 @property
1637 def owner_username(self):
1638 """
1639 Return login of user responsible for this task. May be None. Must be
1640 overridden.
1641 """
1642 raise NotImplementedError
1643
1644
1645 def _working_directory(self):
1646 """
1647 Return the directory where this AgentTask's process executes. Must be
1648 overridden.
1649 """
1650 raise NotImplementedError
1651
1652
1653 def _pidfile_name(self):
1654 """
1655 Return the name of the pidfile this AgentTask's process uses. To be
1656 overridden if necessary.
1657 """
jamesrenc44ae992010-02-19 00:12:54 +00001658 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001659
1660
1661 def _check_paired_results_exist(self):
1662 if not self._paired_with_monitor().has_process():
1663 email_manager.manager.enqueue_notify_email(
1664 'No paired results in task',
1665 'No paired results in task %s at %s'
1666 % (self, self._paired_with_monitor().pidfile_id))
1667 self.finished(False)
1668 return False
1669 return True
1670
1671
1672 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001673 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001674 self.monitor = PidfileRunMonitor()
1675
1676
1677 def run(self):
1678 if not self._check_paired_results_exist():
1679 return
1680
1681 self._create_monitor()
1682 self.monitor.run(
1683 self._command_line(), self._working_directory(),
1684 num_processes=self.num_processes,
1685 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1686 pidfile_name=self._pidfile_name(),
1687 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001688 username=self.owner_username,
1689 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1690
1691
1692 def get_drone_hostnames_allowed(self):
1693 if not models.DroneSet.drone_sets_enabled():
1694 return None
1695
1696 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1697 if not hqes:
1698 # Only special tasks could be missing host queue entries
1699 assert isinstance(self, SpecialAgentTask)
1700 return self._user_or_global_default_drone_set(
1701 self.task, self.task.requested_by)
1702
1703 job_ids = hqes.values_list('job', flat=True).distinct()
1704 assert job_ids.count() == 1, ("AgentTask's queue entries "
1705 "span multiple jobs")
1706
1707 job = models.Job.objects.get(id=job_ids[0])
1708 drone_set = job.drone_set
1709 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001710 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001711
1712 return drone_set.get_drone_hostnames()
1713
1714
1715 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1716 """
1717 Returns the user's default drone set, if present.
1718
1719 Otherwise, returns the global default drone set.
1720 """
1721 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1722 if not user:
1723 logging.warn('%s had no owner; using default drone set',
1724 obj_with_owner)
1725 return default_hostnames
1726 if not user.drone_set:
1727 logging.warn('User %s has no default drone set, using global '
1728 'default', user.login)
1729 return default_hostnames
1730 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001731
1732
1733 def register_necessary_pidfiles(self):
1734 pidfile_id = _drone_manager.get_pidfile_id_from(
1735 self._working_directory(), self._pidfile_name())
1736 _drone_manager.register_pidfile(pidfile_id)
1737
1738 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1739 if paired_pidfile_id:
1740 _drone_manager.register_pidfile(paired_pidfile_id)
1741
1742
1743 def recover(self):
1744 if not self._check_paired_results_exist():
1745 return
1746
1747 self._create_monitor()
1748 self.monitor.attach_to_existing_process(
1749 self._working_directory(), pidfile_name=self._pidfile_name(),
1750 num_processes=self.num_processes)
1751 if not self.monitor.has_process():
1752 # no process to recover; wait to be started normally
1753 self.monitor = None
1754 return
1755
1756 self.started = True
1757 logging.info('Recovering process %s for %s at %s'
1758 % (self.monitor.get_process(), type(self).__name__,
1759 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001760
1761
mbligh4608b002010-01-05 18:22:35 +00001762 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1763 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001764 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001765 for entry in queue_entries:
1766 if entry.status not in allowed_hqe_statuses:
jamesrenb8f3f352010-06-10 00:44:06 +00001767 raise SchedulerError('%s attempting to start '
mbligh4608b002010-01-05 18:22:35 +00001768 'entry with invalid status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001769 % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001770 invalid_host_status = (
1771 allowed_host_statuses is not None
1772 and entry.host.status not in allowed_host_statuses)
1773 if invalid_host_status:
jamesrenb8f3f352010-06-10 00:44:06 +00001774 raise SchedulerError('%s attempting to start on queue '
mbligh4608b002010-01-05 18:22:35 +00001775 'entry with invalid host status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001776 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001777
1778
showardd9205182009-04-27 20:09:55 +00001779class TaskWithJobKeyvals(object):
1780 """AgentTask mixin providing functionality to help with job keyval files."""
1781 _KEYVAL_FILE = 'keyval'
1782 def _format_keyval(self, key, value):
1783 return '%s=%s' % (key, value)
1784
1785
1786 def _keyval_path(self):
1787 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001788 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001789
1790
1791 def _write_keyval_after_job(self, field, value):
1792 assert self.monitor
1793 if not self.monitor.has_process():
1794 return
1795 _drone_manager.write_lines_to_file(
1796 self._keyval_path(), [self._format_keyval(field, value)],
1797 paired_with_process=self.monitor.get_process())
1798
1799
1800 def _job_queued_keyval(self, job):
1801 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1802
1803
1804 def _write_job_finished(self):
1805 self._write_keyval_after_job("job_finished", int(time.time()))
1806
1807
showarddb502762009-09-09 15:31:20 +00001808 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1809 keyval_contents = '\n'.join(self._format_keyval(key, value)
1810 for key, value in keyval_dict.iteritems())
1811 # always end with a newline to allow additional keyvals to be written
1812 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001813 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001814 keyval_contents,
1815 file_path=keyval_path)
1816
1817
1818 def _write_keyvals_before_job(self, keyval_dict):
1819 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1820
1821
1822 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001823 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001824 host.hostname)
1825 platform, all_labels = host.platform_and_labels()
1826 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1827 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1828
1829
showard8cc058f2009-09-08 16:26:33 +00001830class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001831 """
1832 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1833 """
1834
1835 TASK_TYPE = None
1836 host = None
1837 queue_entry = None
1838
showardd1195652009-12-08 22:21:02 +00001839 def __init__(self, task, extra_command_args):
1840 super(SpecialAgentTask, self).__init__()
1841
lmrb7c5d272010-04-16 06:34:04 +00001842 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001843
jamesrenc44ae992010-02-19 00:12:54 +00001844 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001845 self.queue_entry = None
1846 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001847 self.queue_entry = scheduler_models.HostQueueEntry(
1848 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001849
showarded2afea2009-07-07 20:54:07 +00001850 self.task = task
1851 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001852
1853
showard8cc058f2009-09-08 16:26:33 +00001854 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001855 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1856
1857
1858 def _command_line(self):
1859 return _autoserv_command_line(self.host.hostname,
1860 self._extra_command_args,
1861 queue_entry=self.queue_entry)
1862
1863
1864 def _working_directory(self):
1865 return self.task.execution_path()
1866
1867
1868 @property
1869 def owner_username(self):
1870 if self.task.requested_by:
1871 return self.task.requested_by.login
1872 return None
showard8cc058f2009-09-08 16:26:33 +00001873
1874
showarded2afea2009-07-07 20:54:07 +00001875 def prolog(self):
1876 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001877 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001878 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001879
1880
showardde634ee2009-01-30 01:44:24 +00001881 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001882 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001883
showard2fe3f1d2009-07-06 20:19:11 +00001884 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001885 return # don't fail metahost entries, they'll be reassigned
1886
showard2fe3f1d2009-07-06 20:19:11 +00001887 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001888 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001889 return # entry has been aborted
1890
showard2fe3f1d2009-07-06 20:19:11 +00001891 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001892 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001893 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001894 self._write_keyval_after_job(queued_key, queued_time)
1895 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001896
showard8cc058f2009-09-08 16:26:33 +00001897 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001898 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001899 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001900 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001901
showard8cc058f2009-09-08 16:26:33 +00001902 pidfile_id = _drone_manager.get_pidfile_id_from(
1903 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001904 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001905 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001906
1907 if self.queue_entry.job.parse_failed_repair:
1908 self._parse_results([self.queue_entry])
1909 else:
1910 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001911
1912
1913 def cleanup(self):
1914 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001915
1916 # We will consider an aborted task to be "Failed"
1917 self.task.finish(bool(self.success))
1918
showardf85a0b72009-10-07 20:48:45 +00001919 if self.monitor:
1920 if self.monitor.has_process():
1921 self._copy_results([self.task])
1922 if self.monitor.pidfile_id is not None:
1923 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001924
1925
1926class RepairTask(SpecialAgentTask):
1927 TASK_TYPE = models.SpecialTask.Task.REPAIR
1928
1929
showardd1195652009-12-08 22:21:02 +00001930 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001931 """\
1932 queue_entry: queue entry to mark failed if this repair fails.
1933 """
1934 protection = host_protections.Protection.get_string(
1935 task.host.protection)
1936 # normalize the protection name
1937 protection = host_protections.Protection.get_attr_name(protection)
1938
1939 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001940 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001941
1942 # *don't* include the queue entry in IDs -- if the queue entry is
1943 # aborted, we want to leave the repair task running
1944 self._set_ids(host=self.host)
1945
1946
1947 def prolog(self):
1948 super(RepairTask, self).prolog()
1949 logging.info("repair_task starting")
1950 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001951
1952
jadmanski0afbb632008-06-06 21:10:57 +00001953 def epilog(self):
1954 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001955
jadmanski0afbb632008-06-06 21:10:57 +00001956 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001957 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001958 else:
showard8cc058f2009-09-08 16:26:33 +00001959 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001960 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001961 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001962
1963
showarded2afea2009-07-07 20:54:07 +00001964class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001965 def _copy_to_results_repository(self):
1966 if not self.queue_entry or self.queue_entry.meta_host:
1967 return
1968
1969 self.queue_entry.set_execution_subdir()
1970 log_name = os.path.basename(self.task.execution_path())
1971 source = os.path.join(self.task.execution_path(), 'debug',
1972 'autoserv.DEBUG')
1973 destination = os.path.join(
1974 self.queue_entry.execution_path(), log_name)
1975
1976 self.monitor.try_copy_to_results_repository(
1977 source, destination_path=destination)
1978
1979
showard170873e2009-01-07 00:22:26 +00001980 def epilog(self):
1981 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001982
showard775300b2009-09-09 15:30:50 +00001983 if self.success:
1984 return
showard8fe93b52008-11-18 17:53:22 +00001985
showard775300b2009-09-09 15:30:50 +00001986 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001987
showard775300b2009-09-09 15:30:50 +00001988 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001989 # effectively ignore failure for these hosts
1990 self.success = True
showard775300b2009-09-09 15:30:50 +00001991 return
1992
1993 if self.queue_entry:
1994 self.queue_entry.requeue()
1995
1996 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001997 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001998 queue_entry__id=self.queue_entry.id):
1999 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2000 self._fail_queue_entry()
2001 return
2002
showard9bb960b2009-11-19 01:02:11 +00002003 queue_entry = models.HostQueueEntry.objects.get(
2004 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002005 else:
2006 queue_entry = None
2007
2008 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002009 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002010 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002011 queue_entry=queue_entry,
2012 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002013
showard8fe93b52008-11-18 17:53:22 +00002014
2015class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002016 TASK_TYPE = models.SpecialTask.Task.VERIFY
2017
2018
showardd1195652009-12-08 22:21:02 +00002019 def __init__(self, task):
2020 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002021 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002022
2023
jadmanski0afbb632008-06-06 21:10:57 +00002024 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002025 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002026
showardb18134f2009-03-20 20:52:18 +00002027 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002028 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002029 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2030 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002031
jamesren42318f72010-05-10 23:40:59 +00002032 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002033 # and there's no need to keep records of other requests.
2034 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002035 host__id=self.host.id,
2036 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00002037 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00002038 queued_verifies = queued_verifies.exclude(id=self.task.id)
2039 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002040
mbligh36768f02008-02-22 18:28:33 +00002041
jadmanski0afbb632008-06-06 21:10:57 +00002042 def epilog(self):
2043 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002044 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002045 if self.queue_entry:
2046 self.queue_entry.on_pending()
2047 else:
2048 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002049
2050
mbligh4608b002010-01-05 18:22:35 +00002051class CleanupTask(PreJobTask):
2052 # note this can also run post-job, but when it does, it's running standalone
2053 # against the host (not related to the job), so it's not considered a
2054 # PostJobTask
2055
2056 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2057
2058
2059 def __init__(self, task, recover_run_monitor=None):
2060 super(CleanupTask, self).__init__(task, ['--cleanup'])
2061 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2062
2063
2064 def prolog(self):
2065 super(CleanupTask, self).prolog()
2066 logging.info("starting cleanup task for host: %s", self.host.hostname)
2067 self.host.set_status(models.Host.Status.CLEANING)
2068 if self.queue_entry:
2069 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2070
2071
2072 def _finish_epilog(self):
2073 if not self.queue_entry or not self.success:
2074 return
2075
2076 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2077 should_run_verify = (
2078 self.queue_entry.job.run_verify
2079 and self.host.protection != do_not_verify_protection)
2080 if should_run_verify:
2081 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2082 models.SpecialTask.objects.create(
2083 host=models.Host.objects.get(id=self.host.id),
2084 queue_entry=entry,
2085 task=models.SpecialTask.Task.VERIFY)
2086 else:
2087 self.queue_entry.on_pending()
2088
2089
2090 def epilog(self):
2091 super(CleanupTask, self).epilog()
2092
2093 if self.success:
2094 self.host.update_field('dirty', 0)
2095 self.host.set_status(models.Host.Status.READY)
2096
2097 self._finish_epilog()
2098
2099
showarda9545c02009-12-18 22:44:26 +00002100class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2101 """
2102 Common functionality for QueueTask and HostlessQueueTask
2103 """
2104 def __init__(self, queue_entries):
2105 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002106 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002107 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002108
2109
showard73ec0442009-02-07 02:05:20 +00002110 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002111 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002112
2113
jamesrenc44ae992010-02-19 00:12:54 +00002114 def _write_control_file(self, execution_path):
2115 control_path = _drone_manager.attach_file_to_execution(
2116 execution_path, self.job.control_file)
2117 return control_path
2118
2119
showardd1195652009-12-08 22:21:02 +00002120 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002121 execution_path = self.queue_entries[0].execution_path()
2122 control_path = self._write_control_file(execution_path)
2123 hostnames = ','.join(entry.host.hostname
2124 for entry in self.queue_entries
2125 if not entry.is_hostless())
2126
2127 execution_tag = self.queue_entries[0].execution_tag()
2128 params = _autoserv_command_line(
2129 hostnames,
2130 ['-P', execution_tag, '-n',
2131 _drone_manager.absolute_path(control_path)],
2132 job=self.job, verbose=False)
2133
2134 if not self.job.is_server_job():
2135 params.append('-c')
2136
2137 return params
showardd1195652009-12-08 22:21:02 +00002138
2139
2140 @property
2141 def num_processes(self):
2142 return len(self.queue_entries)
2143
2144
2145 @property
2146 def owner_username(self):
2147 return self.job.owner
2148
2149
2150 def _working_directory(self):
2151 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002152
2153
jadmanski0afbb632008-06-06 21:10:57 +00002154 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002155 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002156 keyval_dict = self.job.keyval_dict()
2157 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002158 group_name = self.queue_entries[0].get_group_name()
2159 if group_name:
2160 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002161 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002162 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002163 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002164 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002165
2166
showard35162b02009-03-03 02:17:30 +00002167 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002168 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002169 _drone_manager.write_lines_to_file(error_file_path,
2170 [_LOST_PROCESS_ERROR])
2171
2172
showardd3dc1992009-04-22 21:01:40 +00002173 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002174 if not self.monitor:
2175 return
2176
showardd9205182009-04-27 20:09:55 +00002177 self._write_job_finished()
2178
showard35162b02009-03-03 02:17:30 +00002179 if self.monitor.lost_process:
2180 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002181
jadmanskif7fa2cc2008-10-01 14:13:23 +00002182
showardcbd74612008-11-19 21:42:02 +00002183 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002184 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002185 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002186 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002187 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002188
2189
jadmanskif7fa2cc2008-10-01 14:13:23 +00002190 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002191 if not self.monitor or not self.monitor.has_process():
2192 return
2193
jadmanskif7fa2cc2008-10-01 14:13:23 +00002194 # build up sets of all the aborted_by and aborted_on values
2195 aborted_by, aborted_on = set(), set()
2196 for queue_entry in self.queue_entries:
2197 if queue_entry.aborted_by:
2198 aborted_by.add(queue_entry.aborted_by)
2199 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2200 aborted_on.add(t)
2201
2202 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002203 # TODO(showard): this conditional is now obsolete, we just need to leave
2204 # it in temporarily for backwards compatibility over upgrades. delete
2205 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002206 assert len(aborted_by) <= 1
2207 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002208 aborted_by_value = aborted_by.pop()
2209 aborted_on_value = max(aborted_on)
2210 else:
2211 aborted_by_value = 'autotest_system'
2212 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002213
showarda0382352009-02-11 23:36:43 +00002214 self._write_keyval_after_job("aborted_by", aborted_by_value)
2215 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002216
showardcbd74612008-11-19 21:42:02 +00002217 aborted_on_string = str(datetime.datetime.fromtimestamp(
2218 aborted_on_value))
2219 self._write_status_comment('Job aborted by %s on %s' %
2220 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002221
2222
jadmanski0afbb632008-06-06 21:10:57 +00002223 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002224 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002225 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002226 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002227
2228
jadmanski0afbb632008-06-06 21:10:57 +00002229 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002230 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002231 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002232
2233
2234class QueueTask(AbstractQueueTask):
2235 def __init__(self, queue_entries):
2236 super(QueueTask, self).__init__(queue_entries)
2237 self._set_ids(queue_entries=queue_entries)
2238
2239
2240 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002241 self._check_queue_entry_statuses(
2242 self.queue_entries,
2243 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2244 models.HostQueueEntry.Status.RUNNING),
2245 allowed_host_statuses=(models.Host.Status.PENDING,
2246 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002247
2248 super(QueueTask, self).prolog()
2249
2250 for queue_entry in self.queue_entries:
2251 self._write_host_keyvals(queue_entry.host)
2252 queue_entry.host.set_status(models.Host.Status.RUNNING)
2253 queue_entry.host.update_field('dirty', 1)
2254 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2255 # TODO(gps): Remove this if nothing needs it anymore.
2256 # A potential user is: tko/parser
2257 self.job.write_to_machines_file(self.queue_entries[0])
2258
2259
2260 def _finish_task(self):
2261 super(QueueTask, self)._finish_task()
2262
2263 for queue_entry in self.queue_entries:
2264 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002265 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002266
2267
mbligh4608b002010-01-05 18:22:35 +00002268class HostlessQueueTask(AbstractQueueTask):
2269 def __init__(self, queue_entry):
2270 super(HostlessQueueTask, self).__init__([queue_entry])
2271 self.queue_entry_ids = [queue_entry.id]
2272
2273
2274 def prolog(self):
2275 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2276 super(HostlessQueueTask, self).prolog()
2277
2278
mbligh4608b002010-01-05 18:22:35 +00002279 def _finish_task(self):
2280 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002281 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002282
2283
showardd3dc1992009-04-22 21:01:40 +00002284class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002285 def __init__(self, queue_entries, log_file_name):
2286 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002287
showardd1195652009-12-08 22:21:02 +00002288 self.queue_entries = queue_entries
2289
showardd3dc1992009-04-22 21:01:40 +00002290 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002291 self._autoserv_monitor.attach_to_existing_process(
2292 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002293
showardd1195652009-12-08 22:21:02 +00002294
2295 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002296 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002297 return 'true'
2298 return self._generate_command(
2299 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002300
2301
2302 def _generate_command(self, results_dir):
2303 raise NotImplementedError('Subclasses must override this')
2304
2305
showardd1195652009-12-08 22:21:02 +00002306 @property
2307 def owner_username(self):
2308 return self.queue_entries[0].job.owner
2309
2310
2311 def _working_directory(self):
2312 return self._get_consistent_execution_path(self.queue_entries)
2313
2314
2315 def _paired_with_monitor(self):
2316 return self._autoserv_monitor
2317
2318
showardd3dc1992009-04-22 21:01:40 +00002319 def _job_was_aborted(self):
2320 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002321 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002322 queue_entry.update_from_database()
2323 if was_aborted is None: # first queue entry
2324 was_aborted = bool(queue_entry.aborted)
2325 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002326 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2327 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002328 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002329 'Inconsistent abort state',
2330 'Queue entries have inconsistent abort state:\n' +
2331 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002332 # don't crash here, just assume true
2333 return True
2334 return was_aborted
2335
2336
showardd1195652009-12-08 22:21:02 +00002337 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002338 if self._job_was_aborted():
2339 return models.HostQueueEntry.Status.ABORTED
2340
2341 # we'll use a PidfileRunMonitor to read the autoserv exit status
2342 if self._autoserv_monitor.exit_code() == 0:
2343 return models.HostQueueEntry.Status.COMPLETED
2344 return models.HostQueueEntry.Status.FAILED
2345
2346
showardd3dc1992009-04-22 21:01:40 +00002347 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002348 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002349 queue_entry.set_status(status)
2350
2351
2352 def abort(self):
2353 # override AgentTask.abort() to avoid killing the process and ending
2354 # the task. post-job tasks continue when the job is aborted.
2355 pass
2356
2357
mbligh4608b002010-01-05 18:22:35 +00002358 def _pidfile_label(self):
2359 # '.autoserv_execute' -> 'autoserv'
2360 return self._pidfile_name()[1:-len('_execute')]
2361
2362
showard9bb960b2009-11-19 01:02:11 +00002363class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002364 """
2365 Task responsible for
2366 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2367 * copying logs to the results repository
2368 * spawning CleanupTasks for hosts, if necessary
2369 * spawning a FinalReparseTask for the job
2370 """
showardd1195652009-12-08 22:21:02 +00002371 def __init__(self, queue_entries, recover_run_monitor=None):
2372 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002373 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002374 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002375 self._set_ids(queue_entries=queue_entries)
2376
2377
2378 def _generate_command(self, results_dir):
2379 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002380 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002381 return [_autoserv_path , '-p',
2382 '--pidfile-label=%s' % self._pidfile_label(),
2383 '--use-existing-results', '--collect-crashinfo',
2384 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002385
2386
showardd1195652009-12-08 22:21:02 +00002387 @property
2388 def num_processes(self):
2389 return len(self.queue_entries)
2390
2391
2392 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002393 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002394
2395
showardd3dc1992009-04-22 21:01:40 +00002396 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002397 self._check_queue_entry_statuses(
2398 self.queue_entries,
2399 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2400 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002401
showardd3dc1992009-04-22 21:01:40 +00002402 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002403
2404
showardd3dc1992009-04-22 21:01:40 +00002405 def epilog(self):
2406 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002407 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002408 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002409
showard9bb960b2009-11-19 01:02:11 +00002410
2411 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002412 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002413 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002414 models.HostQueueEntry.Status.COMPLETED)
2415 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2416 else:
2417 final_success = False
2418 num_tests_failed = 0
2419
showard9bb960b2009-11-19 01:02:11 +00002420 reboot_after = self._job.reboot_after
2421 do_reboot = (
2422 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002423 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002424 or reboot_after == model_attributes.RebootAfter.ALWAYS
2425 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002426 and final_success and num_tests_failed == 0))
2427
showardd1195652009-12-08 22:21:02 +00002428 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002429 if do_reboot:
2430 # don't pass the queue entry to the CleanupTask. if the cleanup
2431 # fails, the job doesn't care -- it's over.
2432 models.SpecialTask.objects.create(
2433 host=models.Host.objects.get(id=queue_entry.host.id),
2434 task=models.SpecialTask.Task.CLEANUP,
2435 requested_by=self._job.owner_model())
2436 else:
2437 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002438
2439
showard0bbfc212009-04-29 21:06:13 +00002440 def run(self):
showard597bfd32009-05-08 18:22:50 +00002441 autoserv_exit_code = self._autoserv_monitor.exit_code()
2442 # only run if Autoserv exited due to some signal. if we have no exit
2443 # code, assume something bad (and signal-like) happened.
2444 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002445 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002446 else:
2447 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002448
2449
mbligh4608b002010-01-05 18:22:35 +00002450class SelfThrottledPostJobTask(PostJobTask):
2451 """
2452 Special AgentTask subclass that maintains its own global process limit.
2453 """
2454 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002455
2456
mbligh4608b002010-01-05 18:22:35 +00002457 @classmethod
2458 def _increment_running_processes(cls):
2459 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002460
mblighd5c95802008-03-05 00:33:46 +00002461
mbligh4608b002010-01-05 18:22:35 +00002462 @classmethod
2463 def _decrement_running_processes(cls):
2464 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002465
2466
mbligh4608b002010-01-05 18:22:35 +00002467 @classmethod
2468 def _max_processes(cls):
2469 raise NotImplementedError
2470
2471
2472 @classmethod
2473 def _can_run_new_process(cls):
2474 return cls._num_running_processes < cls._max_processes()
2475
2476
2477 def _process_started(self):
2478 return bool(self.monitor)
2479
2480
2481 def tick(self):
2482 # override tick to keep trying to start until the process count goes
2483 # down and we can, at which point we revert to default behavior
2484 if self._process_started():
2485 super(SelfThrottledPostJobTask, self).tick()
2486 else:
2487 self._try_starting_process()
2488
2489
2490 def run(self):
2491 # override run() to not actually run unless we can
2492 self._try_starting_process()
2493
2494
2495 def _try_starting_process(self):
2496 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002497 return
2498
mbligh4608b002010-01-05 18:22:35 +00002499 # actually run the command
2500 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002501 if self._process_started():
2502 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002503
mblighd5c95802008-03-05 00:33:46 +00002504
mbligh4608b002010-01-05 18:22:35 +00002505 def finished(self, success):
2506 super(SelfThrottledPostJobTask, self).finished(success)
2507 if self._process_started():
2508 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002509
showard21baa452008-10-21 00:08:39 +00002510
mbligh4608b002010-01-05 18:22:35 +00002511class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002512 def __init__(self, queue_entries):
2513 super(FinalReparseTask, self).__init__(queue_entries,
2514 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002515 # don't use _set_ids, since we don't want to set the host_ids
2516 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002517
2518
2519 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002520 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002521 results_dir]
2522
2523
2524 @property
2525 def num_processes(self):
2526 return 0 # don't include parser processes in accounting
2527
2528
2529 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002530 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002531
2532
showard97aed502008-11-04 02:01:24 +00002533 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002534 def _max_processes(cls):
2535 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002536
2537
2538 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002539 self._check_queue_entry_statuses(
2540 self.queue_entries,
2541 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002542
showard97aed502008-11-04 02:01:24 +00002543 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002544
2545
2546 def epilog(self):
2547 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002548 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002549
2550
mbligh4608b002010-01-05 18:22:35 +00002551class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002552 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2553
mbligh4608b002010-01-05 18:22:35 +00002554 def __init__(self, queue_entries):
2555 super(ArchiveResultsTask, self).__init__(queue_entries,
2556 log_file_name='.archiving.log')
2557 # don't use _set_ids, since we don't want to set the host_ids
2558 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002559
2560
mbligh4608b002010-01-05 18:22:35 +00002561 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002562 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002563
2564
mbligh4608b002010-01-05 18:22:35 +00002565 def _generate_command(self, results_dir):
2566 return [_autoserv_path , '-p',
2567 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002568 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002569 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2570 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002571
2572
mbligh4608b002010-01-05 18:22:35 +00002573 @classmethod
2574 def _max_processes(cls):
2575 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002576
2577
2578 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002579 self._check_queue_entry_statuses(
2580 self.queue_entries,
2581 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2582
2583 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002584
2585
mbligh4608b002010-01-05 18:22:35 +00002586 def epilog(self):
2587 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002588 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002589 failed_file = os.path.join(self._working_directory(),
2590 self._ARCHIVING_FAILED_FILE)
2591 paired_process = self._paired_with_monitor().get_process()
2592 _drone_manager.write_lines_to_file(
2593 failed_file, ['Archiving failed with exit code %s'
2594 % self.monitor.exit_code()],
2595 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002596 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002597
2598
mbligh36768f02008-02-22 18:28:33 +00002599if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002600 main()