blob: c5abed32a4896e7f6f8f748755a84c3b650c369f [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000056_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000057_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000058
Eric Lie0493a42010-11-15 13:05:43 -080059def _parser_path_default(install_dir):
60 return os.path.join(install_dir, 'tko', 'parse')
61_parser_path_func = utils.import_site_function(
62 __file__, 'autotest_lib.scheduler.site_monitor_db',
63 'parser_path', _parser_path_default)
64_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
65
mbligh36768f02008-02-22 18:28:33 +000066
showardec6a3b92009-09-25 20:29:13 +000067def _get_pidfile_timeout_secs():
68 """@returns How long to wait for autoserv to write pidfile."""
69 pidfile_timeout_mins = global_config.global_config.get_config_value(
70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
71 return pidfile_timeout_mins * 60
72
73
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesrenc44ae992010-02-19 00:12:54 +000078get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000079 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000080 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000081
82
jamesren76fcf192010-04-21 20:39:50 +000083def _verify_default_drone_set_exists():
84 if (models.DroneSet.drone_sets_enabled() and
85 not models.DroneSet.default_drone_set_name()):
86 raise SchedulerError('Drone sets are enabled, but no default is set')
87
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000108 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000109
showard136e6dc2009-06-10 19:38:49 +0000110 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser = optparse.OptionParser(usage)
112 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
113 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser.add_option('--test', help='Indicate that scheduler is under ' +
115 'test and should use dummy autoserv and no parsing',
116 action='store_true')
117 (options, args) = parser.parse_args()
118 if len(args) != 1:
119 parser.print_usage()
120 return
mbligh36768f02008-02-22 18:28:33 +0000121
showard5613c662009-06-08 23:30:33 +0000122 scheduler_enabled = global_config.global_config.get_config_value(
123 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
124
125 if not scheduler_enabled:
126 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
127 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000128 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000129 sys.exit(1)
130
jadmanski0afbb632008-06-06 21:10:57 +0000131 global RESULTS_DIR
132 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000133
mbligh83c1e9e2009-05-01 23:10:41 +0000134 site_init = utils.import_site_function(__file__,
135 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
136 _site_init_monitor_db_dummy)
137 site_init()
138
showardcca334f2009-03-12 20:38:34 +0000139 # Change the cwd while running to avoid issues incase we were launched from
140 # somewhere odd (such as a random NFS home directory of the person running
141 # sudo to launch us as the appropriate user).
142 os.chdir(RESULTS_DIR)
143
jamesrenc7d387e2010-08-10 21:48:30 +0000144 # This is helpful for debugging why stuff a scheduler launches is
145 # misbehaving.
146 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000147
jadmanski0afbb632008-06-06 21:10:57 +0000148 if options.test:
149 global _autoserv_path
150 _autoserv_path = 'autoserv_dummy'
151 global _testing_mode
152 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000153
jamesrenc44ae992010-02-19 00:12:54 +0000154 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000155 server.start()
156
jadmanski0afbb632008-06-06 21:10:57 +0000157 try:
jamesrenc44ae992010-02-19 00:12:54 +0000158 initialize()
showardc5afc462009-01-13 00:09:39 +0000159 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000160 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 while not _shutdown:
163 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000164 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000165 except:
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.log_stacktrace(
167 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000168
showard170873e2009-01-07 00:22:26 +0000169 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000170 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000171 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000172 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000173
174
showard136e6dc2009-06-10 19:38:49 +0000175def setup_logging():
176 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
177 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
178 logging_manager.configure_logging(
179 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
180 logfile_name=log_name)
181
182
mbligh36768f02008-02-22 18:28:33 +0000183def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000184 global _shutdown
185 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000186 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000187
188
jamesrenc44ae992010-02-19 00:12:54 +0000189def initialize():
showardb18134f2009-03-20 20:52:18 +0000190 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
191 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000192
showard8de37132009-08-31 18:33:08 +0000193 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000194 logging.critical("monitor_db already running, aborting!")
195 sys.exit(1)
196 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000197
showardb1e51872008-10-07 11:08:18 +0000198 if _testing_mode:
199 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000200 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
203 global _db
showard170873e2009-01-07 00:22:26 +0000204 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000205 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000206
showardfa8629c2008-11-04 16:51:23 +0000207 # ensure Django connection is in autocommit
208 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000209 # bypass the readonly connection
210 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000213 signal.signal(signal.SIGINT, handle_sigint)
214
jamesrenc44ae992010-02-19 00:12:54 +0000215 initialize_globals()
216 scheduler_models.initialize()
217
showardd1ee1dd2009-01-07 21:33:08 +0000218 drones = global_config.global_config.get_config_value(
219 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
220 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000221 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000222 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000223 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
224
showardb18134f2009-03-20 20:52:18 +0000225 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000226
227
jamesrenc44ae992010-02-19 00:12:54 +0000228def initialize_globals():
229 global _drone_manager
230 _drone_manager = drone_manager.instance()
231
232
showarded2afea2009-07-07 20:54:07 +0000233def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
234 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000235 """
236 @returns The autoserv command line as a list of executable + parameters.
237
238 @param machines - string - A machine or comma separated list of machines
239 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000240 @param extra_args - list - Additional arguments to pass to autoserv.
241 @param job - Job object - If supplied, -u owner and -l name parameters
242 will be added.
243 @param queue_entry - A HostQueueEntry object - If supplied and no Job
244 object was supplied, this will be used to lookup the Job object.
245 """
showarda9545c02009-12-18 22:44:26 +0000246 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000247 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000248 if machines:
249 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000250 if job or queue_entry:
251 if not job:
252 job = queue_entry.job
253 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000254 if verbose:
255 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000256 return autoserv_argv + extra_args
257
258
showard89f84db2009-03-12 20:39:13 +0000259class SchedulerError(Exception):
260 """Raised by HostScheduler when an inconsistent state occurs."""
261
262
jamesren883492a2010-02-12 00:45:18 +0000263class HostScheduler(metahost_scheduler.HostSchedulingUtility):
264 """Handles the logic for choosing when to run jobs and on which hosts.
265
266 This class makes several queries to the database on each tick, building up
267 some auxiliary data structures and using them to determine which hosts are
268 eligible to run which jobs, taking into account all the various factors that
269 affect that.
270
271 In the past this was done with one or two very large, complex database
272 queries. It has proven much simpler and faster to build these auxiliary
273 data structures and perform the logic in Python.
274 """
275 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000276 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
277
278 # load site-specific scheduler selected in global_config
279 site_schedulers_str = global_config.global_config.get_config_value(
280 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
281 default='')
282 site_schedulers = set(site_schedulers_str.split(','))
283 for scheduler in get_site_metahost_schedulers():
284 if type(scheduler).__name__ in site_schedulers:
285 # always prepend, so site schedulers take precedence
286 self._metahost_schedulers = (
287 [scheduler] + self._metahost_schedulers)
288 logging.info('Metahost schedulers: %s',
289 ', '.join(type(scheduler).__name__ for scheduler
290 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000291
292
showard63a34772008-08-18 19:32:50 +0000293 def _get_ready_hosts(self):
294 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000295 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000296 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
297 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000298 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000299 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000300 "AND NOT afe_hosts.locked "
301 "AND (afe_hosts.status IS NULL "
302 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000303 return dict((host.id, host) for host in hosts)
304
305
306 @staticmethod
307 def _get_sql_id_list(id_list):
308 return ','.join(str(item_id) for item_id in id_list)
309
310
311 @classmethod
showard989f25d2008-10-01 11:38:11 +0000312 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000313 if not id_list:
314 return {}
showard63a34772008-08-18 19:32:50 +0000315 query %= cls._get_sql_id_list(id_list)
316 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000317 return cls._process_many2many_dict(rows, flip)
318
319
320 @staticmethod
321 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000322 result = {}
323 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000324 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000325 if flip:
326 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000327 result.setdefault(left_id, set()).add(right_id)
328 return result
329
330
331 @classmethod
332 def _get_job_acl_groups(cls, job_ids):
333 query = """
showardeab66ce2009-12-23 00:03:56 +0000334 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
335 FROM afe_jobs
336 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
337 INNER JOIN afe_acl_groups_users ON
338 afe_acl_groups_users.user_id = afe_users.id
339 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000340 """
341 return cls._get_many2many_dict(query, job_ids)
342
343
344 @classmethod
345 def _get_job_ineligible_hosts(cls, job_ids):
346 query = """
347 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000348 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000349 WHERE job_id IN (%s)
350 """
351 return cls._get_many2many_dict(query, job_ids)
352
353
354 @classmethod
showard989f25d2008-10-01 11:38:11 +0000355 def _get_job_dependencies(cls, job_ids):
356 query = """
357 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000358 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000359 WHERE job_id IN (%s)
360 """
361 return cls._get_many2many_dict(query, job_ids)
362
363
364 @classmethod
showard63a34772008-08-18 19:32:50 +0000365 def _get_host_acls(cls, host_ids):
366 query = """
showardd9ac4452009-02-07 02:04:37 +0000367 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000368 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000369 WHERE host_id IN (%s)
370 """
371 return cls._get_many2many_dict(query, host_ids)
372
373
374 @classmethod
375 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000376 if not host_ids:
377 return {}, {}
showard63a34772008-08-18 19:32:50 +0000378 query = """
379 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000380 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000381 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000382 """ % cls._get_sql_id_list(host_ids)
383 rows = _db.execute(query)
384 labels_to_hosts = cls._process_many2many_dict(rows)
385 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
386 return labels_to_hosts, hosts_to_labels
387
388
389 @classmethod
390 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000391 return dict((label.id, label) for label
392 in scheduler_models.Label.fetch())
393
394
395 def recovery_on_startup(self):
396 for metahost_scheduler in self._metahost_schedulers:
397 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000398
399
400 def refresh(self, pending_queue_entries):
401 self._hosts_available = self._get_ready_hosts()
402
403 relevant_jobs = [queue_entry.job_id
404 for queue_entry in pending_queue_entries]
405 self._job_acls = self._get_job_acl_groups(relevant_jobs)
406 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000407 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000408
409 host_ids = self._hosts_available.keys()
410 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000411 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
412
413 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000414
jamesrene21bf412010-02-26 02:30:07 +0000415
416 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000417 for metahost_scheduler in self._metahost_schedulers:
418 metahost_scheduler.tick()
419
showard63a34772008-08-18 19:32:50 +0000420
jamesren883492a2010-02-12 00:45:18 +0000421 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000422 return set(self._label_hosts.get(label_id, ()))
423
424
425 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000426 self._label_hosts[label_id].remove(host_id)
427
428
429 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000430 return self._hosts_available.pop(host_id)
431
432
433 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000434 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
435
436
showard63a34772008-08-18 19:32:50 +0000437 def _is_acl_accessible(self, host_id, queue_entry):
438 job_acls = self._job_acls.get(queue_entry.job_id, set())
439 host_acls = self._host_acls.get(host_id, set())
440 return len(host_acls.intersection(job_acls)) > 0
441
442
showard989f25d2008-10-01 11:38:11 +0000443 def _check_job_dependencies(self, job_dependencies, host_labels):
444 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000445 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000446
447
448 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
449 queue_entry):
showardade14e22009-01-26 22:38:32 +0000450 if not queue_entry.meta_host:
451 # bypass only_if_needed labels when a specific host is selected
452 return True
453
showard989f25d2008-10-01 11:38:11 +0000454 for label_id in host_labels:
455 label = self._labels[label_id]
456 if not label.only_if_needed:
457 # we don't care about non-only_if_needed labels
458 continue
459 if queue_entry.meta_host == label_id:
460 # if the label was requested in a metahost it's OK
461 continue
462 if label_id not in job_dependencies:
463 return False
464 return True
465
466
showard89f84db2009-03-12 20:39:13 +0000467 def _check_atomic_group_labels(self, host_labels, queue_entry):
468 """
469 Determine if the given HostQueueEntry's atomic group settings are okay
470 to schedule on a host with the given labels.
471
showard6157c632009-07-06 20:19:31 +0000472 @param host_labels: A list of label ids that the host has.
473 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000474
475 @returns True if atomic group settings are okay, False otherwise.
476 """
showard6157c632009-07-06 20:19:31 +0000477 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000478 queue_entry.atomic_group_id)
479
480
showard6157c632009-07-06 20:19:31 +0000481 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000482 """
483 Return the atomic group label id for a host with the given set of
484 labels if any, or None otherwise. Raises an exception if more than
485 one atomic group are found in the set of labels.
486
showard6157c632009-07-06 20:19:31 +0000487 @param host_labels: A list of label ids that the host has.
488 @param queue_entry: The HostQueueEntry we're testing. Only used for
489 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000490
491 @returns The id of the atomic group found on a label in host_labels
492 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000493 """
showard6157c632009-07-06 20:19:31 +0000494 atomic_labels = [self._labels[label_id] for label_id in host_labels
495 if self._labels[label_id].atomic_group_id is not None]
496 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000497 if not atomic_ids:
498 return None
499 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000500 logging.error('More than one Atomic Group on HQE "%s" via: %r',
501 queue_entry, atomic_labels)
502 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000503
504
505 def _get_atomic_group_labels(self, atomic_group_id):
506 """
507 Lookup the label ids that an atomic_group is associated with.
508
509 @param atomic_group_id - The id of the AtomicGroup to look up.
510
511 @returns A generator yeilding Label ids for this atomic group.
512 """
513 return (id for id, label in self._labels.iteritems()
514 if label.atomic_group_id == atomic_group_id
515 and not label.invalid)
516
517
showard54c1ea92009-05-20 00:32:58 +0000518 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000519 """
520 @param group_hosts - A sequence of Host ids to test for usability
521 and eligibility against the Job associated with queue_entry.
522 @param queue_entry - The HostQueueEntry that these hosts are being
523 tested for eligibility against.
524
525 @returns A subset of group_hosts Host ids that are eligible for the
526 supplied queue_entry.
527 """
528 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000529 if self.is_host_usable(host_id)
530 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000531
532
jamesren883492a2010-02-12 00:45:18 +0000533 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000534 if self._is_host_invalid(host_id):
535 # if an invalid host is scheduled for a job, it's a one-time host
536 # and it therefore bypasses eligibility checks. note this can only
537 # happen for non-metahosts, because invalid hosts have their label
538 # relationships cleared.
539 return True
540
showard989f25d2008-10-01 11:38:11 +0000541 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
542 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000543
showard89f84db2009-03-12 20:39:13 +0000544 return (self._is_acl_accessible(host_id, queue_entry) and
545 self._check_job_dependencies(job_dependencies, host_labels) and
546 self._check_only_if_needed_labels(
547 job_dependencies, host_labels, queue_entry) and
548 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000549
550
showard2924b0a2009-06-18 23:16:15 +0000551 def _is_host_invalid(self, host_id):
552 host_object = self._hosts_available.get(host_id, None)
553 return host_object and host_object.invalid
554
555
showard63a34772008-08-18 19:32:50 +0000556 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000557 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000558 return None
559 return self._hosts_available.pop(queue_entry.host_id, None)
560
561
jamesren883492a2010-02-12 00:45:18 +0000562 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000563 if host_id not in self._hosts_available:
564 # host was already used during this scheduling cycle
565 return False
566 if self._hosts_available[host_id].invalid:
567 # Invalid hosts cannot be used for metahosts. They're included in
568 # the original query because they can be used by non-metahosts.
569 return False
570 return True
571
572
jamesren883492a2010-02-12 00:45:18 +0000573 def schedule_entry(self, queue_entry):
574 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000575 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000576
577 for scheduler in self._metahost_schedulers:
578 if scheduler.can_schedule_metahost(queue_entry):
579 scheduler.schedule_metahost(queue_entry, self)
580 return None
581
582 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000583
584
showard89f84db2009-03-12 20:39:13 +0000585 def find_eligible_atomic_group(self, queue_entry):
586 """
587 Given an atomic group host queue entry, locate an appropriate group
588 of hosts for the associated job to run on.
589
590 The caller is responsible for creating new HQEs for the additional
591 hosts returned in order to run the actual job on them.
592
593 @returns A list of Host instances in a ready state to satisfy this
594 atomic group scheduling. Hosts will all belong to the same
595 atomic group label as specified by the queue_entry.
596 An empty list will be returned if no suitable atomic
597 group could be found.
598
599 TODO(gps): what is responsible for kicking off any attempted repairs on
600 a group of hosts? not this function, but something needs to. We do
601 not communicate that reason for returning [] outside of here...
602 For now, we'll just be unschedulable if enough hosts within one group
603 enter Repair Failed state.
604 """
605 assert queue_entry.atomic_group_id is not None
606 job = queue_entry.job
607 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000608 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000609 if job.synch_count > atomic_group.max_number_of_machines:
610 # Such a Job and HostQueueEntry should never be possible to
611 # create using the frontend. Regardless, we can't process it.
612 # Abort it immediately and log an error on the scheduler.
613 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000614 logging.error(
615 'Error: job %d synch_count=%d > requested atomic_group %d '
616 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
617 job.id, job.synch_count, atomic_group.id,
618 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000619 return []
jamesren883492a2010-02-12 00:45:18 +0000620 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
621 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000622
623 # Look in each label associated with atomic_group until we find one with
624 # enough hosts to satisfy the job.
625 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000626 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000627 if queue_entry.meta_host is not None:
628 # If we have a metahost label, only allow its hosts.
629 group_hosts.intersection_update(hosts_in_label)
630 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000631 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000632 group_hosts, queue_entry)
633
634 # Job.synch_count is treated as "minimum synch count" when
635 # scheduling for an atomic group of hosts. The atomic group
636 # number of machines is the maximum to pick out of a single
637 # atomic group label for scheduling at one time.
638 min_hosts = job.synch_count
639 max_hosts = atomic_group.max_number_of_machines
640
showard54c1ea92009-05-20 00:32:58 +0000641 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000642 # Not enough eligible hosts in this atomic group label.
643 continue
644
showard54c1ea92009-05-20 00:32:58 +0000645 eligible_hosts_in_group = [self._hosts_available[id]
646 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000647 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000648 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000649
showard89f84db2009-03-12 20:39:13 +0000650 # Limit ourselves to scheduling the atomic group size.
651 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000652 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000653
654 # Remove the selected hosts from our cached internal state
655 # of available hosts in order to return the Host objects.
656 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000657 for host in eligible_hosts_in_group:
658 hosts_in_label.discard(host.id)
659 self._hosts_available.pop(host.id)
660 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000661 return host_list
662
663 return []
664
665
showard170873e2009-01-07 00:22:26 +0000666class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000667 def __init__(self):
668 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000669 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000670 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000671 user_cleanup_time = scheduler_config.config.clean_interval
672 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
673 _db, user_cleanup_time)
674 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000675 self._host_agents = {}
676 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000677 self._tick_count = 0
678 self._last_garbage_stats_time = time.time()
679 self._seconds_between_garbage_stats = 60 * (
680 global_config.global_config.get_config_value(
681 scheduler_config.CONFIG_SECTION,
682 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000683
mbligh36768f02008-02-22 18:28:33 +0000684
showard915958d2009-04-22 21:00:58 +0000685 def initialize(self, recover_hosts=True):
686 self._periodic_cleanup.initialize()
687 self._24hr_upkeep.initialize()
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 # always recover processes
690 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000691
jadmanski0afbb632008-06-06 21:10:57 +0000692 if recover_hosts:
693 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000694
jamesrenc44ae992010-02-19 00:12:54 +0000695 self._host_scheduler.recovery_on_startup()
696
mbligh36768f02008-02-22 18:28:33 +0000697
jadmanski0afbb632008-06-06 21:10:57 +0000698 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000699 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000700 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000701 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000702 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000703 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000704 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000705 self._schedule_running_host_queue_entries()
706 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000707 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000708 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000709 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000710 _drone_manager.execute_actions()
711 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000712 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000713 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000714
showard97aed502008-11-04 02:01:24 +0000715
mblighf3294cc2009-04-08 21:17:38 +0000716 def _run_cleanup(self):
717 self._periodic_cleanup.run_cleanup_maybe()
718 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000719
mbligh36768f02008-02-22 18:28:33 +0000720
showardf13a9e22009-12-18 22:54:09 +0000721 def _garbage_collection(self):
722 threshold_time = time.time() - self._seconds_between_garbage_stats
723 if threshold_time < self._last_garbage_stats_time:
724 # Don't generate these reports very often.
725 return
726
727 self._last_garbage_stats_time = time.time()
728 # Force a full level 0 collection (because we can, it doesn't hurt
729 # at this interval).
730 gc.collect()
731 logging.info('Logging garbage collector stats on tick %d.',
732 self._tick_count)
733 gc_stats._log_garbage_collector_stats()
734
735
showard170873e2009-01-07 00:22:26 +0000736 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
737 for object_id in object_ids:
738 agent_dict.setdefault(object_id, set()).add(agent)
739
740
741 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
742 for object_id in object_ids:
743 assert object_id in agent_dict
744 agent_dict[object_id].remove(agent)
745
746
showardd1195652009-12-08 22:21:02 +0000747 def add_agent_task(self, agent_task):
748 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000749 self._agents.append(agent)
750 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000751 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
752 self._register_agent_for_ids(self._queue_entry_agents,
753 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000754
showard170873e2009-01-07 00:22:26 +0000755
756 def get_agents_for_entry(self, queue_entry):
757 """
758 Find agents corresponding to the specified queue_entry.
759 """
showardd3dc1992009-04-22 21:01:40 +0000760 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000761
762
763 def host_has_agent(self, host):
764 """
765 Determine if there is currently an Agent present using this host.
766 """
767 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000768
769
jadmanski0afbb632008-06-06 21:10:57 +0000770 def remove_agent(self, agent):
771 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000772 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
773 agent)
774 self._unregister_agent_for_ids(self._queue_entry_agents,
775 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000776
777
showard8cc058f2009-09-08 16:26:33 +0000778 def _host_has_scheduled_special_task(self, host):
779 return bool(models.SpecialTask.objects.filter(host__id=host.id,
780 is_active=False,
781 is_complete=False))
782
783
jadmanski0afbb632008-06-06 21:10:57 +0000784 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000785 agent_tasks = self._create_recovery_agent_tasks()
786 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000787 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000788 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000789 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000790 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000791 self._reverify_remaining_hosts()
792 # reinitialize drones after killing orphaned processes, since they can
793 # leave around files when they die
794 _drone_manager.execute_actions()
795 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000796
showard170873e2009-01-07 00:22:26 +0000797
showardd1195652009-12-08 22:21:02 +0000798 def _create_recovery_agent_tasks(self):
799 return (self._get_queue_entry_agent_tasks()
800 + self._get_special_task_agent_tasks(is_active=True))
801
802
803 def _get_queue_entry_agent_tasks(self):
804 # host queue entry statuses handled directly by AgentTasks (Verifying is
805 # handled through SpecialTasks, so is not listed here)
806 statuses = (models.HostQueueEntry.Status.STARTING,
807 models.HostQueueEntry.Status.RUNNING,
808 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000809 models.HostQueueEntry.Status.PARSING,
810 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000811 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000812 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000813 where='status IN (%s)' % status_list)
814
815 agent_tasks = []
816 used_queue_entries = set()
817 for entry in queue_entries:
818 if self.get_agents_for_entry(entry):
819 # already being handled
820 continue
821 if entry in used_queue_entries:
822 # already picked up by a synchronous job
823 continue
824 agent_task = self._get_agent_task_for_queue_entry(entry)
825 agent_tasks.append(agent_task)
826 used_queue_entries.update(agent_task.queue_entries)
827 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000828
829
showardd1195652009-12-08 22:21:02 +0000830 def _get_special_task_agent_tasks(self, is_active=False):
831 special_tasks = models.SpecialTask.objects.filter(
832 is_active=is_active, is_complete=False)
833 return [self._get_agent_task_for_special_task(task)
834 for task in special_tasks]
835
836
837 def _get_agent_task_for_queue_entry(self, queue_entry):
838 """
839 Construct an AgentTask instance for the given active HostQueueEntry,
840 if one can currently run it.
841 @param queue_entry: a HostQueueEntry
842 @returns an AgentTask to run the queue entry
843 """
844 task_entries = queue_entry.job.get_group_entries(queue_entry)
845 self._check_for_duplicate_host_entries(task_entries)
846
847 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
848 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000849 if queue_entry.is_hostless():
850 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000851 return QueueTask(queue_entries=task_entries)
852 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
853 return GatherLogsTask(queue_entries=task_entries)
854 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
855 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000856 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
857 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000858
859 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000860 'invalid status %s: %s'
861 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000862
863
864 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000865 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
866 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000867 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000868 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000869 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000870 if using_host:
showardd1195652009-12-08 22:21:02 +0000871 self._assert_host_has_no_agent(task_entry)
872
873
874 def _assert_host_has_no_agent(self, entry):
875 """
876 @param entry: a HostQueueEntry or a SpecialTask
877 """
878 if self.host_has_agent(entry.host):
879 agent = tuple(self._host_agents.get(entry.host.id))[0]
880 raise SchedulerError(
881 'While scheduling %s, host %s already has a host agent %s'
882 % (entry, entry.host, agent.task))
883
884
885 def _get_agent_task_for_special_task(self, special_task):
886 """
887 Construct an AgentTask class to run the given SpecialTask and add it
888 to this dispatcher.
889 @param special_task: a models.SpecialTask instance
890 @returns an AgentTask to run this SpecialTask
891 """
892 self._assert_host_has_no_agent(special_task)
893
894 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
895 for agent_task_class in special_agent_task_classes:
896 if agent_task_class.TASK_TYPE == special_task.task:
897 return agent_task_class(task=special_task)
898
899 raise SchedulerError('No AgentTask class for task', str(special_task))
900
901
902 def _register_pidfiles(self, agent_tasks):
903 for agent_task in agent_tasks:
904 agent_task.register_necessary_pidfiles()
905
906
907 def _recover_tasks(self, agent_tasks):
908 orphans = _drone_manager.get_orphaned_autoserv_processes()
909
910 for agent_task in agent_tasks:
911 agent_task.recover()
912 if agent_task.monitor and agent_task.monitor.has_process():
913 orphans.discard(agent_task.monitor.get_process())
914 self.add_agent_task(agent_task)
915
916 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000917
918
showard8cc058f2009-09-08 16:26:33 +0000919 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000920 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
921 % status):
showard0db3d432009-10-12 20:29:15 +0000922 if entry.status == status and not self.get_agents_for_entry(entry):
923 # The status can change during iteration, e.g., if job.run()
924 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000925 yield entry
926
927
showard6878e8b2009-07-20 22:37:45 +0000928 def _check_for_remaining_orphan_processes(self, orphans):
929 if not orphans:
930 return
931 subject = 'Unrecovered orphan autoserv processes remain'
932 message = '\n'.join(str(process) for process in orphans)
933 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000934
935 die_on_orphans = global_config.global_config.get_config_value(
936 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
937
938 if die_on_orphans:
939 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000940
showard170873e2009-01-07 00:22:26 +0000941
showard8cc058f2009-09-08 16:26:33 +0000942 def _recover_pending_entries(self):
943 for entry in self._get_unassigned_entries(
944 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000945 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000946 entry.on_pending()
947
948
showardb8900452009-10-12 20:31:01 +0000949 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000950 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000951 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
952 unrecovered_hqes = []
953 for queue_entry in queue_entries:
954 special_tasks = models.SpecialTask.objects.filter(
955 task__in=(models.SpecialTask.Task.CLEANUP,
956 models.SpecialTask.Task.VERIFY),
957 queue_entry__id=queue_entry.id,
958 is_complete=False)
959 if special_tasks.count() == 0:
960 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000961
showardb8900452009-10-12 20:31:01 +0000962 if unrecovered_hqes:
963 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000964 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000965 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000966 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000967
968
showard65db3932009-10-28 19:54:35 +0000969 def _get_prioritized_special_tasks(self):
970 """
971 Returns all queued SpecialTasks prioritized for repair first, then
972 cleanup, then verify.
973 """
974 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
975 is_complete=False,
976 host__locked=False)
977 # exclude hosts with active queue entries unless the SpecialTask is for
978 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000979 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000980 queued_tasks, 'afe_host_queue_entries', 'host_id',
981 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000982 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000983 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000984 where=['(afe_host_queue_entries.id IS NULL OR '
985 'afe_host_queue_entries.id = '
986 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000987
showard65db3932009-10-28 19:54:35 +0000988 # reorder tasks by priority
989 task_priority_order = [models.SpecialTask.Task.REPAIR,
990 models.SpecialTask.Task.CLEANUP,
991 models.SpecialTask.Task.VERIFY]
992 def task_priority_key(task):
993 return task_priority_order.index(task.task)
994 return sorted(queued_tasks, key=task_priority_key)
995
996
showard65db3932009-10-28 19:54:35 +0000997 def _schedule_special_tasks(self):
998 """
999 Execute queued SpecialTasks that are ready to run on idle hosts.
1000 """
1001 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +00001002 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +00001003 continue
showardd1195652009-12-08 22:21:02 +00001004 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +00001005
1006
showard170873e2009-01-07 00:22:26 +00001007 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +00001008 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001009 # should never happen
showarded2afea2009-07-07 20:54:07 +00001010 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001011 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001012 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001013 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001014 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001015
1016
jadmanski0afbb632008-06-06 21:10:57 +00001017 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001018 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001019 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001020 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001021 if self.host_has_agent(host):
1022 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001023 continue
showard8cc058f2009-09-08 16:26:33 +00001024 if self._host_has_scheduled_special_task(host):
1025 # host will have a special task scheduled on the next cycle
1026 continue
showard170873e2009-01-07 00:22:26 +00001027 if print_message:
showardb18134f2009-03-20 20:52:18 +00001028 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001029 models.SpecialTask.objects.create(
1030 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001031 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001032
1033
jadmanski0afbb632008-06-06 21:10:57 +00001034 def _recover_hosts(self):
1035 # recover "Repair Failed" hosts
1036 message = 'Reverifying dead host %s'
1037 self._reverify_hosts_where("status = 'Repair Failed'",
1038 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001039
1040
showard04c82c52008-05-29 19:38:12 +00001041
showardb95b1bd2008-08-15 18:11:04 +00001042 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001043 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001044 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001045 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001046 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001047 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001048
1049
showard89f84db2009-03-12 20:39:13 +00001050 def _refresh_pending_queue_entries(self):
1051 """
1052 Lookup the pending HostQueueEntries and call our HostScheduler
1053 refresh() method given that list. Return the list.
1054
1055 @returns A list of pending HostQueueEntries sorted in priority order.
1056 """
showard63a34772008-08-18 19:32:50 +00001057 queue_entries = self._get_pending_queue_entries()
1058 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001059 return []
showardb95b1bd2008-08-15 18:11:04 +00001060
showard63a34772008-08-18 19:32:50 +00001061 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001062
showard89f84db2009-03-12 20:39:13 +00001063 return queue_entries
1064
1065
1066 def _schedule_atomic_group(self, queue_entry):
1067 """
1068 Schedule the given queue_entry on an atomic group of hosts.
1069
1070 Returns immediately if there are insufficient available hosts.
1071
1072 Creates new HostQueueEntries based off of queue_entry for the
1073 scheduled hosts and starts them all running.
1074 """
1075 # This is a virtual host queue entry representing an entire
1076 # atomic group, find a group and schedule their hosts.
1077 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1078 queue_entry)
1079 if not group_hosts:
1080 return
showardcbe6f942009-06-17 19:33:49 +00001081
1082 logging.info('Expanding atomic group entry %s with hosts %s',
1083 queue_entry,
1084 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001085
showard89f84db2009-03-12 20:39:13 +00001086 for assigned_host in group_hosts[1:]:
1087 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001088 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001089 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001090 new_hqe.set_host(assigned_host)
1091 self._run_queue_entry(new_hqe)
1092
1093 # The first assigned host uses the original HostQueueEntry
1094 queue_entry.set_host(group_hosts[0])
1095 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001096
1097
showarda9545c02009-12-18 22:44:26 +00001098 def _schedule_hostless_job(self, queue_entry):
1099 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001100 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001101
1102
showard89f84db2009-03-12 20:39:13 +00001103 def _schedule_new_jobs(self):
1104 queue_entries = self._refresh_pending_queue_entries()
1105 if not queue_entries:
1106 return
1107
showard63a34772008-08-18 19:32:50 +00001108 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001109 is_unassigned_atomic_group = (
1110 queue_entry.atomic_group_id is not None
1111 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001112
1113 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001114 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001115 elif is_unassigned_atomic_group:
1116 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001117 else:
jamesren883492a2010-02-12 00:45:18 +00001118 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001119 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001120 assert assigned_host.id == queue_entry.host_id
1121 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001122
1123
showard8cc058f2009-09-08 16:26:33 +00001124 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001125 for agent_task in self._get_queue_entry_agent_tasks():
1126 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001127
1128
1129 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001130 for entry in scheduler_models.HostQueueEntry.fetch(
1131 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001132 task = entry.job.schedule_delayed_callback_task(entry)
1133 if task:
showardd1195652009-12-08 22:21:02 +00001134 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001135
1136
jamesren883492a2010-02-12 00:45:18 +00001137 def _run_queue_entry(self, queue_entry):
1138 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001139
1140
jadmanski0afbb632008-06-06 21:10:57 +00001141 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +00001142 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +00001143 for entry in scheduler_models.HostQueueEntry.fetch(
1144 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001145 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001146 for agent in self.get_agents_for_entry(entry):
1147 agent.abort()
1148 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +00001149 jobs_to_stop.add(entry.job)
1150 for job in jobs_to_stop:
1151 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +00001152
1153
showard324bf812009-01-20 23:23:38 +00001154 def _can_start_agent(self, agent, num_started_this_cycle,
1155 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001156 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001157 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001158 return True
1159 # don't allow any nonzero-process agents to run after we've reached a
1160 # limit (this avoids starvation of many-process agents)
1161 if have_reached_limit:
1162 return False
1163 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001164 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001165 agent.task.owner_username,
1166 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001167 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001168 return False
1169 # if a single agent exceeds the per-cycle throttling, still allow it to
1170 # run when it's the first agent in the cycle
1171 if num_started_this_cycle == 0:
1172 return True
1173 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001174 if (num_started_this_cycle + agent.task.num_processes >
1175 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001176 return False
1177 return True
1178
1179
jadmanski0afbb632008-06-06 21:10:57 +00001180 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001181 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001182 have_reached_limit = False
1183 # iterate over copy, so we can remove agents during iteration
1184 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001185 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001186 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001187 have_reached_limit):
1188 have_reached_limit = True
1189 continue
showardd1195652009-12-08 22:21:02 +00001190 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001191 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001192 if agent.is_done():
1193 logging.info("agent finished")
1194 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001195 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001196 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001197
1198
showard29f7cd22009-04-29 21:16:24 +00001199 def _process_recurring_runs(self):
1200 recurring_runs = models.RecurringRun.objects.filter(
1201 start_date__lte=datetime.datetime.now())
1202 for rrun in recurring_runs:
1203 # Create job from template
1204 job = rrun.job
1205 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001206 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001207
1208 host_objects = info['hosts']
1209 one_time_hosts = info['one_time_hosts']
1210 metahost_objects = info['meta_hosts']
1211 dependencies = info['dependencies']
1212 atomic_group = info['atomic_group']
1213
1214 for host in one_time_hosts or []:
1215 this_host = models.Host.create_one_time_host(host.hostname)
1216 host_objects.append(this_host)
1217
1218 try:
1219 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001220 options=options,
showard29f7cd22009-04-29 21:16:24 +00001221 host_objects=host_objects,
1222 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001223 atomic_group=atomic_group)
1224
1225 except Exception, ex:
1226 logging.exception(ex)
1227 #TODO send email
1228
1229 if rrun.loop_count == 1:
1230 rrun.delete()
1231 else:
1232 if rrun.loop_count != 0: # if not infinite loop
1233 # calculate new start_date
1234 difference = datetime.timedelta(seconds=rrun.loop_period)
1235 rrun.start_date = rrun.start_date + difference
1236 rrun.loop_count -= 1
1237 rrun.save()
1238
1239
showard170873e2009-01-07 00:22:26 +00001240class PidfileRunMonitor(object):
1241 """
1242 Client must call either run() to start a new process or
1243 attach_to_existing_process().
1244 """
mbligh36768f02008-02-22 18:28:33 +00001245
showard170873e2009-01-07 00:22:26 +00001246 class _PidfileException(Exception):
1247 """
1248 Raised when there's some unexpected behavior with the pid file, but only
1249 used internally (never allowed to escape this class).
1250 """
mbligh36768f02008-02-22 18:28:33 +00001251
1252
showard170873e2009-01-07 00:22:26 +00001253 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001254 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001255 self._start_time = None
1256 self.pidfile_id = None
1257 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001258
1259
showard170873e2009-01-07 00:22:26 +00001260 def _add_nice_command(self, command, nice_level):
1261 if not nice_level:
1262 return command
1263 return ['nice', '-n', str(nice_level)] + command
1264
1265
1266 def _set_start_time(self):
1267 self._start_time = time.time()
1268
1269
showard418785b2009-11-23 20:19:59 +00001270 def run(self, command, working_directory, num_processes, nice_level=None,
1271 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001272 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001273 assert command is not None
1274 if nice_level is not None:
1275 command = ['nice', '-n', str(nice_level)] + command
1276 self._set_start_time()
1277 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001278 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001279 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001280 paired_with_pidfile=paired_with_pidfile, username=username,
1281 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001282
1283
showarded2afea2009-07-07 20:54:07 +00001284 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001285 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001286 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001287 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001288 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001289 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001290 if num_processes is not None:
1291 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001292
1293
jadmanski0afbb632008-06-06 21:10:57 +00001294 def kill(self):
showard170873e2009-01-07 00:22:26 +00001295 if self.has_process():
1296 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001297
mbligh36768f02008-02-22 18:28:33 +00001298
showard170873e2009-01-07 00:22:26 +00001299 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001300 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001301 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001302
1303
showard170873e2009-01-07 00:22:26 +00001304 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001305 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001306 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001307 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001308
1309
showard170873e2009-01-07 00:22:26 +00001310 def _read_pidfile(self, use_second_read=False):
1311 assert self.pidfile_id is not None, (
1312 'You must call run() or attach_to_existing_process()')
1313 contents = _drone_manager.get_pidfile_contents(
1314 self.pidfile_id, use_second_read=use_second_read)
1315 if contents.is_invalid():
1316 self._state = drone_manager.PidfileContents()
1317 raise self._PidfileException(contents)
1318 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001319
1320
showard21baa452008-10-21 00:08:39 +00001321 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001322 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1323 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001324 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001325 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001326
1327
1328 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001329 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001330 return
mblighbb421852008-03-11 22:36:16 +00001331
showard21baa452008-10-21 00:08:39 +00001332 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001333
showard170873e2009-01-07 00:22:26 +00001334 if self._state.process is None:
1335 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001336 return
mbligh90a549d2008-03-25 23:52:34 +00001337
showard21baa452008-10-21 00:08:39 +00001338 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001339 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001340 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001341 return
mbligh90a549d2008-03-25 23:52:34 +00001342
showard170873e2009-01-07 00:22:26 +00001343 # pid but no running process - maybe process *just* exited
1344 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001345 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001346 # autoserv exited without writing an exit code
1347 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001348 self._handle_pidfile_error(
1349 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001350
showard21baa452008-10-21 00:08:39 +00001351
1352 def _get_pidfile_info(self):
1353 """\
1354 After completion, self._state will contain:
1355 pid=None, exit_status=None if autoserv has not yet run
1356 pid!=None, exit_status=None if autoserv is running
1357 pid!=None, exit_status!=None if autoserv has completed
1358 """
1359 try:
1360 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001361 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001362 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001363
1364
showard170873e2009-01-07 00:22:26 +00001365 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001366 """\
1367 Called when no pidfile is found or no pid is in the pidfile.
1368 """
showard170873e2009-01-07 00:22:26 +00001369 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001370 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001371 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001372 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001373 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001374
1375
showard35162b02009-03-03 02:17:30 +00001376 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001377 """\
1378 Called when autoserv has exited without writing an exit status,
1379 or we've timed out waiting for autoserv to write a pid to the
1380 pidfile. In either case, we just return failure and the caller
1381 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001382
showard170873e2009-01-07 00:22:26 +00001383 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001384 """
1385 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001386 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001387 self._state.exit_status = 1
1388 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001389
1390
jadmanski0afbb632008-06-06 21:10:57 +00001391 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001392 self._get_pidfile_info()
1393 return self._state.exit_status
1394
1395
1396 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001397 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001398 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001399 if self._state.num_tests_failed is None:
1400 return -1
showard21baa452008-10-21 00:08:39 +00001401 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001402
1403
showardcdaeae82009-08-31 18:32:48 +00001404 def try_copy_results_on_drone(self, **kwargs):
1405 if self.has_process():
1406 # copy results logs into the normal place for job results
1407 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1408
1409
1410 def try_copy_to_results_repository(self, source, **kwargs):
1411 if self.has_process():
1412 _drone_manager.copy_to_results_repository(self.get_process(),
1413 source, **kwargs)
1414
1415
mbligh36768f02008-02-22 18:28:33 +00001416class Agent(object):
showard77182562009-06-10 00:16:05 +00001417 """
showard8cc058f2009-09-08 16:26:33 +00001418 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001419
1420 The following methods are required on all task objects:
1421 poll() - Called periodically to let the task check its status and
1422 update its internal state. If the task succeeded.
1423 is_done() - Returns True if the task is finished.
1424 abort() - Called when an abort has been requested. The task must
1425 set its aborted attribute to True if it actually aborted.
1426
1427 The following attributes are required on all task objects:
1428 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001429 success - bool, True if this task succeeded.
1430 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1431 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001432 """
1433
1434
showard418785b2009-11-23 20:19:59 +00001435 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001436 """
showard8cc058f2009-09-08 16:26:33 +00001437 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001438 """
showard8cc058f2009-09-08 16:26:33 +00001439 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001440
showard77182562009-06-10 00:16:05 +00001441 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001442 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001443
showard8cc058f2009-09-08 16:26:33 +00001444 self.queue_entry_ids = task.queue_entry_ids
1445 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001446
showard8cc058f2009-09-08 16:26:33 +00001447 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001448 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001449
1450
jadmanski0afbb632008-06-06 21:10:57 +00001451 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001452 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001453 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001454 self.task.poll()
1455 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001456 self.finished = True
showardec113162008-05-08 00:52:49 +00001457
1458
jadmanski0afbb632008-06-06 21:10:57 +00001459 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001460 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001461
1462
showardd3dc1992009-04-22 21:01:40 +00001463 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001464 if self.task:
1465 self.task.abort()
1466 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001467 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001468 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001469
showardd3dc1992009-04-22 21:01:40 +00001470
mbligh36768f02008-02-22 18:28:33 +00001471class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001472 class _NullMonitor(object):
1473 pidfile_id = None
1474
1475 def has_process(self):
1476 return True
1477
1478
1479 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001480 """
showardd1195652009-12-08 22:21:02 +00001481 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001482 """
jadmanski0afbb632008-06-06 21:10:57 +00001483 self.done = False
showardd1195652009-12-08 22:21:02 +00001484 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001486 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001487 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001488 self.queue_entry_ids = []
1489 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001490 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001491
1492
1493 def _set_ids(self, host=None, queue_entries=None):
1494 if queue_entries and queue_entries != [None]:
1495 self.host_ids = [entry.host.id for entry in queue_entries]
1496 self.queue_entry_ids = [entry.id for entry in queue_entries]
1497 else:
1498 assert host
1499 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001500
1501
jadmanski0afbb632008-06-06 21:10:57 +00001502 def poll(self):
showard08a36412009-05-05 01:01:13 +00001503 if not self.started:
1504 self.start()
showardd1195652009-12-08 22:21:02 +00001505 if not self.done:
1506 self.tick()
showard08a36412009-05-05 01:01:13 +00001507
1508
1509 def tick(self):
showardd1195652009-12-08 22:21:02 +00001510 assert self.monitor
1511 exit_code = self.monitor.exit_code()
1512 if exit_code is None:
1513 return
mbligh36768f02008-02-22 18:28:33 +00001514
showardd1195652009-12-08 22:21:02 +00001515 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001516 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001517
1518
jadmanski0afbb632008-06-06 21:10:57 +00001519 def is_done(self):
1520 return self.done
mbligh36768f02008-02-22 18:28:33 +00001521
1522
jadmanski0afbb632008-06-06 21:10:57 +00001523 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001524 if self.done:
showardd1195652009-12-08 22:21:02 +00001525 assert self.started
showard08a36412009-05-05 01:01:13 +00001526 return
showardd1195652009-12-08 22:21:02 +00001527 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001528 self.done = True
1529 self.success = success
1530 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001531
1532
jadmanski0afbb632008-06-06 21:10:57 +00001533 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001534 """
1535 To be overridden.
1536 """
showarded2afea2009-07-07 20:54:07 +00001537 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001538 self.register_necessary_pidfiles()
1539
1540
1541 def _log_file(self):
1542 if not self._log_file_name:
1543 return None
1544 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001545
mbligh36768f02008-02-22 18:28:33 +00001546
jadmanski0afbb632008-06-06 21:10:57 +00001547 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001548 log_file = self._log_file()
1549 if self.monitor and log_file:
1550 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001551
1552
jadmanski0afbb632008-06-06 21:10:57 +00001553 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001554 """
1555 To be overridden.
1556 """
jadmanski0afbb632008-06-06 21:10:57 +00001557 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001558 logging.info("%s finished with success=%s", type(self).__name__,
1559 self.success)
1560
mbligh36768f02008-02-22 18:28:33 +00001561
1562
jadmanski0afbb632008-06-06 21:10:57 +00001563 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001564 if not self.started:
1565 self.prolog()
1566 self.run()
1567
1568 self.started = True
1569
1570
1571 def abort(self):
1572 if self.monitor:
1573 self.monitor.kill()
1574 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001575 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001576 self.cleanup()
1577
1578
showarded2afea2009-07-07 20:54:07 +00001579 def _get_consistent_execution_path(self, execution_entries):
1580 first_execution_path = execution_entries[0].execution_path()
1581 for execution_entry in execution_entries[1:]:
1582 assert execution_entry.execution_path() == first_execution_path, (
1583 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1584 execution_entry,
1585 first_execution_path,
1586 execution_entries[0]))
1587 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001588
1589
showarded2afea2009-07-07 20:54:07 +00001590 def _copy_results(self, execution_entries, use_monitor=None):
1591 """
1592 @param execution_entries: list of objects with execution_path() method
1593 """
showard6d1c1432009-08-20 23:30:39 +00001594 if use_monitor is not None and not use_monitor.has_process():
1595 return
1596
showarded2afea2009-07-07 20:54:07 +00001597 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001598 if use_monitor is None:
1599 assert self.monitor
1600 use_monitor = self.monitor
1601 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001602 execution_path = self._get_consistent_execution_path(execution_entries)
1603 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001604 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001605
showarda1e74b32009-05-12 17:32:04 +00001606
1607 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001608 for queue_entry in queue_entries:
1609 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001610
1611
mbligh4608b002010-01-05 18:22:35 +00001612 def _archive_results(self, queue_entries):
1613 for queue_entry in queue_entries:
1614 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001615
1616
showardd1195652009-12-08 22:21:02 +00001617 def _command_line(self):
1618 """
1619 Return the command line to run. Must be overridden.
1620 """
1621 raise NotImplementedError
1622
1623
1624 @property
1625 def num_processes(self):
1626 """
1627 Return the number of processes forked by this AgentTask's process. It
1628 may only be approximate. To be overridden if necessary.
1629 """
1630 return 1
1631
1632
1633 def _paired_with_monitor(self):
1634 """
1635 If this AgentTask's process must run on the same machine as some
1636 previous process, this method should be overridden to return a
1637 PidfileRunMonitor for that process.
1638 """
1639 return self._NullMonitor()
1640
1641
1642 @property
1643 def owner_username(self):
1644 """
1645 Return login of user responsible for this task. May be None. Must be
1646 overridden.
1647 """
1648 raise NotImplementedError
1649
1650
1651 def _working_directory(self):
1652 """
1653 Return the directory where this AgentTask's process executes. Must be
1654 overridden.
1655 """
1656 raise NotImplementedError
1657
1658
1659 def _pidfile_name(self):
1660 """
1661 Return the name of the pidfile this AgentTask's process uses. To be
1662 overridden if necessary.
1663 """
jamesrenc44ae992010-02-19 00:12:54 +00001664 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001665
1666
1667 def _check_paired_results_exist(self):
1668 if not self._paired_with_monitor().has_process():
1669 email_manager.manager.enqueue_notify_email(
1670 'No paired results in task',
1671 'No paired results in task %s at %s'
1672 % (self, self._paired_with_monitor().pidfile_id))
1673 self.finished(False)
1674 return False
1675 return True
1676
1677
1678 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001679 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001680 self.monitor = PidfileRunMonitor()
1681
1682
1683 def run(self):
1684 if not self._check_paired_results_exist():
1685 return
1686
1687 self._create_monitor()
1688 self.monitor.run(
1689 self._command_line(), self._working_directory(),
1690 num_processes=self.num_processes,
1691 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1692 pidfile_name=self._pidfile_name(),
1693 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001694 username=self.owner_username,
1695 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1696
1697
1698 def get_drone_hostnames_allowed(self):
1699 if not models.DroneSet.drone_sets_enabled():
1700 return None
1701
1702 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1703 if not hqes:
1704 # Only special tasks could be missing host queue entries
1705 assert isinstance(self, SpecialAgentTask)
1706 return self._user_or_global_default_drone_set(
1707 self.task, self.task.requested_by)
1708
1709 job_ids = hqes.values_list('job', flat=True).distinct()
1710 assert job_ids.count() == 1, ("AgentTask's queue entries "
1711 "span multiple jobs")
1712
1713 job = models.Job.objects.get(id=job_ids[0])
1714 drone_set = job.drone_set
1715 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001716 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001717
1718 return drone_set.get_drone_hostnames()
1719
1720
1721 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1722 """
1723 Returns the user's default drone set, if present.
1724
1725 Otherwise, returns the global default drone set.
1726 """
1727 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1728 if not user:
1729 logging.warn('%s had no owner; using default drone set',
1730 obj_with_owner)
1731 return default_hostnames
1732 if not user.drone_set:
1733 logging.warn('User %s has no default drone set, using global '
1734 'default', user.login)
1735 return default_hostnames
1736 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001737
1738
1739 def register_necessary_pidfiles(self):
1740 pidfile_id = _drone_manager.get_pidfile_id_from(
1741 self._working_directory(), self._pidfile_name())
1742 _drone_manager.register_pidfile(pidfile_id)
1743
1744 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1745 if paired_pidfile_id:
1746 _drone_manager.register_pidfile(paired_pidfile_id)
1747
1748
1749 def recover(self):
1750 if not self._check_paired_results_exist():
1751 return
1752
1753 self._create_monitor()
1754 self.monitor.attach_to_existing_process(
1755 self._working_directory(), pidfile_name=self._pidfile_name(),
1756 num_processes=self.num_processes)
1757 if not self.monitor.has_process():
1758 # no process to recover; wait to be started normally
1759 self.monitor = None
1760 return
1761
1762 self.started = True
1763 logging.info('Recovering process %s for %s at %s'
1764 % (self.monitor.get_process(), type(self).__name__,
1765 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001766
1767
mbligh4608b002010-01-05 18:22:35 +00001768 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1769 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001770 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001771 for entry in queue_entries:
1772 if entry.status not in allowed_hqe_statuses:
jamesrenb8f3f352010-06-10 00:44:06 +00001773 raise SchedulerError('%s attempting to start '
mbligh4608b002010-01-05 18:22:35 +00001774 'entry with invalid status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001775 % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001776 invalid_host_status = (
1777 allowed_host_statuses is not None
1778 and entry.host.status not in allowed_host_statuses)
1779 if invalid_host_status:
jamesrenb8f3f352010-06-10 00:44:06 +00001780 raise SchedulerError('%s attempting to start on queue '
mbligh4608b002010-01-05 18:22:35 +00001781 'entry with invalid host status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001782 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001783
1784
showardd9205182009-04-27 20:09:55 +00001785class TaskWithJobKeyvals(object):
1786 """AgentTask mixin providing functionality to help with job keyval files."""
1787 _KEYVAL_FILE = 'keyval'
1788 def _format_keyval(self, key, value):
1789 return '%s=%s' % (key, value)
1790
1791
1792 def _keyval_path(self):
1793 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001794 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001795
1796
1797 def _write_keyval_after_job(self, field, value):
1798 assert self.monitor
1799 if not self.monitor.has_process():
1800 return
1801 _drone_manager.write_lines_to_file(
1802 self._keyval_path(), [self._format_keyval(field, value)],
1803 paired_with_process=self.monitor.get_process())
1804
1805
1806 def _job_queued_keyval(self, job):
1807 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1808
1809
1810 def _write_job_finished(self):
1811 self._write_keyval_after_job("job_finished", int(time.time()))
1812
1813
showarddb502762009-09-09 15:31:20 +00001814 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1815 keyval_contents = '\n'.join(self._format_keyval(key, value)
1816 for key, value in keyval_dict.iteritems())
1817 # always end with a newline to allow additional keyvals to be written
1818 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001819 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001820 keyval_contents,
1821 file_path=keyval_path)
1822
1823
1824 def _write_keyvals_before_job(self, keyval_dict):
1825 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1826
1827
1828 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001829 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001830 host.hostname)
1831 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001832 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001833 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1834 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1835
1836
showard8cc058f2009-09-08 16:26:33 +00001837class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001838 """
1839 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1840 """
1841
1842 TASK_TYPE = None
1843 host = None
1844 queue_entry = None
1845
showardd1195652009-12-08 22:21:02 +00001846 def __init__(self, task, extra_command_args):
1847 super(SpecialAgentTask, self).__init__()
1848
lmrb7c5d272010-04-16 06:34:04 +00001849 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001850
jamesrenc44ae992010-02-19 00:12:54 +00001851 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001852 self.queue_entry = None
1853 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001854 self.queue_entry = scheduler_models.HostQueueEntry(
1855 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001856
showarded2afea2009-07-07 20:54:07 +00001857 self.task = task
1858 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001859
1860
showard8cc058f2009-09-08 16:26:33 +00001861 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001862 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1863
1864
1865 def _command_line(self):
1866 return _autoserv_command_line(self.host.hostname,
1867 self._extra_command_args,
1868 queue_entry=self.queue_entry)
1869
1870
1871 def _working_directory(self):
1872 return self.task.execution_path()
1873
1874
1875 @property
1876 def owner_username(self):
1877 if self.task.requested_by:
1878 return self.task.requested_by.login
1879 return None
showard8cc058f2009-09-08 16:26:33 +00001880
1881
showarded2afea2009-07-07 20:54:07 +00001882 def prolog(self):
1883 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001884 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001885 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001886
1887
showardde634ee2009-01-30 01:44:24 +00001888 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001889 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001890
showard2fe3f1d2009-07-06 20:19:11 +00001891 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001892 return # don't fail metahost entries, they'll be reassigned
1893
showard2fe3f1d2009-07-06 20:19:11 +00001894 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001895 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001896 return # entry has been aborted
1897
showard2fe3f1d2009-07-06 20:19:11 +00001898 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001899 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001900 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001901 self._write_keyval_after_job(queued_key, queued_time)
1902 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001903
showard8cc058f2009-09-08 16:26:33 +00001904 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001905 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001906 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001907 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001908
showard8cc058f2009-09-08 16:26:33 +00001909 pidfile_id = _drone_manager.get_pidfile_id_from(
1910 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001911 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001912 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001913
1914 if self.queue_entry.job.parse_failed_repair:
1915 self._parse_results([self.queue_entry])
1916 else:
1917 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001918
1919
1920 def cleanup(self):
1921 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001922
1923 # We will consider an aborted task to be "Failed"
1924 self.task.finish(bool(self.success))
1925
showardf85a0b72009-10-07 20:48:45 +00001926 if self.monitor:
1927 if self.monitor.has_process():
1928 self._copy_results([self.task])
1929 if self.monitor.pidfile_id is not None:
1930 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001931
1932
1933class RepairTask(SpecialAgentTask):
1934 TASK_TYPE = models.SpecialTask.Task.REPAIR
1935
1936
showardd1195652009-12-08 22:21:02 +00001937 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001938 """\
1939 queue_entry: queue entry to mark failed if this repair fails.
1940 """
1941 protection = host_protections.Protection.get_string(
1942 task.host.protection)
1943 # normalize the protection name
1944 protection = host_protections.Protection.get_attr_name(protection)
1945
1946 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001947 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001948
1949 # *don't* include the queue entry in IDs -- if the queue entry is
1950 # aborted, we want to leave the repair task running
1951 self._set_ids(host=self.host)
1952
1953
1954 def prolog(self):
1955 super(RepairTask, self).prolog()
1956 logging.info("repair_task starting")
1957 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001958
1959
jadmanski0afbb632008-06-06 21:10:57 +00001960 def epilog(self):
1961 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001962
jadmanski0afbb632008-06-06 21:10:57 +00001963 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001964 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001965 else:
showard8cc058f2009-09-08 16:26:33 +00001966 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001967 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001968 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001969
1970
showarded2afea2009-07-07 20:54:07 +00001971class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001972 def _copy_to_results_repository(self):
1973 if not self.queue_entry or self.queue_entry.meta_host:
1974 return
1975
1976 self.queue_entry.set_execution_subdir()
1977 log_name = os.path.basename(self.task.execution_path())
1978 source = os.path.join(self.task.execution_path(), 'debug',
1979 'autoserv.DEBUG')
1980 destination = os.path.join(
1981 self.queue_entry.execution_path(), log_name)
1982
1983 self.monitor.try_copy_to_results_repository(
1984 source, destination_path=destination)
1985
1986
showard170873e2009-01-07 00:22:26 +00001987 def epilog(self):
1988 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001989
showard775300b2009-09-09 15:30:50 +00001990 if self.success:
1991 return
showard8fe93b52008-11-18 17:53:22 +00001992
showard775300b2009-09-09 15:30:50 +00001993 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001994
showard775300b2009-09-09 15:30:50 +00001995 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001996 # effectively ignore failure for these hosts
1997 self.success = True
showard775300b2009-09-09 15:30:50 +00001998 return
1999
2000 if self.queue_entry:
2001 self.queue_entry.requeue()
2002
2003 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00002004 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00002005 queue_entry__id=self.queue_entry.id):
2006 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2007 self._fail_queue_entry()
2008 return
2009
showard9bb960b2009-11-19 01:02:11 +00002010 queue_entry = models.HostQueueEntry.objects.get(
2011 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002012 else:
2013 queue_entry = None
2014
2015 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002016 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002017 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002018 queue_entry=queue_entry,
2019 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002020
showard8fe93b52008-11-18 17:53:22 +00002021
2022class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002023 TASK_TYPE = models.SpecialTask.Task.VERIFY
2024
2025
showardd1195652009-12-08 22:21:02 +00002026 def __init__(self, task):
2027 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002028 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002029
2030
jadmanski0afbb632008-06-06 21:10:57 +00002031 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002032 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002033
showardb18134f2009-03-20 20:52:18 +00002034 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002035 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002036 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2037 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002038
jamesren42318f72010-05-10 23:40:59 +00002039 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002040 # and there's no need to keep records of other requests.
2041 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002042 host__id=self.host.id,
2043 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00002044 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00002045 queued_verifies = queued_verifies.exclude(id=self.task.id)
2046 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002047
mbligh36768f02008-02-22 18:28:33 +00002048
jadmanski0afbb632008-06-06 21:10:57 +00002049 def epilog(self):
2050 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002051 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002052 if self.queue_entry:
2053 self.queue_entry.on_pending()
2054 else:
2055 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002056
2057
mbligh4608b002010-01-05 18:22:35 +00002058class CleanupTask(PreJobTask):
2059 # note this can also run post-job, but when it does, it's running standalone
2060 # against the host (not related to the job), so it's not considered a
2061 # PostJobTask
2062
2063 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2064
2065
2066 def __init__(self, task, recover_run_monitor=None):
2067 super(CleanupTask, self).__init__(task, ['--cleanup'])
2068 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2069
2070
2071 def prolog(self):
2072 super(CleanupTask, self).prolog()
2073 logging.info("starting cleanup task for host: %s", self.host.hostname)
2074 self.host.set_status(models.Host.Status.CLEANING)
2075 if self.queue_entry:
2076 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2077
2078
2079 def _finish_epilog(self):
2080 if not self.queue_entry or not self.success:
2081 return
2082
2083 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2084 should_run_verify = (
2085 self.queue_entry.job.run_verify
2086 and self.host.protection != do_not_verify_protection)
2087 if should_run_verify:
2088 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2089 models.SpecialTask.objects.create(
2090 host=models.Host.objects.get(id=self.host.id),
2091 queue_entry=entry,
2092 task=models.SpecialTask.Task.VERIFY)
2093 else:
2094 self.queue_entry.on_pending()
2095
2096
2097 def epilog(self):
2098 super(CleanupTask, self).epilog()
2099
2100 if self.success:
2101 self.host.update_field('dirty', 0)
2102 self.host.set_status(models.Host.Status.READY)
2103
2104 self._finish_epilog()
2105
2106
showarda9545c02009-12-18 22:44:26 +00002107class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2108 """
2109 Common functionality for QueueTask and HostlessQueueTask
2110 """
2111 def __init__(self, queue_entries):
2112 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002113 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002114 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002115
2116
showard73ec0442009-02-07 02:05:20 +00002117 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002118 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002119
2120
jamesrenc44ae992010-02-19 00:12:54 +00002121 def _write_control_file(self, execution_path):
2122 control_path = _drone_manager.attach_file_to_execution(
2123 execution_path, self.job.control_file)
2124 return control_path
2125
2126
showardd1195652009-12-08 22:21:02 +00002127 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002128 execution_path = self.queue_entries[0].execution_path()
2129 control_path = self._write_control_file(execution_path)
2130 hostnames = ','.join(entry.host.hostname
2131 for entry in self.queue_entries
2132 if not entry.is_hostless())
2133
2134 execution_tag = self.queue_entries[0].execution_tag()
2135 params = _autoserv_command_line(
2136 hostnames,
2137 ['-P', execution_tag, '-n',
2138 _drone_manager.absolute_path(control_path)],
2139 job=self.job, verbose=False)
2140
2141 if not self.job.is_server_job():
2142 params.append('-c')
2143
2144 return params
showardd1195652009-12-08 22:21:02 +00002145
2146
2147 @property
2148 def num_processes(self):
2149 return len(self.queue_entries)
2150
2151
2152 @property
2153 def owner_username(self):
2154 return self.job.owner
2155
2156
2157 def _working_directory(self):
2158 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002159
2160
jadmanski0afbb632008-06-06 21:10:57 +00002161 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002162 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002163 keyval_dict = self.job.keyval_dict()
2164 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002165 group_name = self.queue_entries[0].get_group_name()
2166 if group_name:
2167 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002168 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002169 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002170 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002171 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002172
2173
showard35162b02009-03-03 02:17:30 +00002174 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002175 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002176 _drone_manager.write_lines_to_file(error_file_path,
2177 [_LOST_PROCESS_ERROR])
2178
2179
showardd3dc1992009-04-22 21:01:40 +00002180 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002181 if not self.monitor:
2182 return
2183
showardd9205182009-04-27 20:09:55 +00002184 self._write_job_finished()
2185
showard35162b02009-03-03 02:17:30 +00002186 if self.monitor.lost_process:
2187 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002188
jadmanskif7fa2cc2008-10-01 14:13:23 +00002189
showardcbd74612008-11-19 21:42:02 +00002190 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002191 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002192 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002193 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002194 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002195
2196
jadmanskif7fa2cc2008-10-01 14:13:23 +00002197 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002198 if not self.monitor or not self.monitor.has_process():
2199 return
2200
jadmanskif7fa2cc2008-10-01 14:13:23 +00002201 # build up sets of all the aborted_by and aborted_on values
2202 aborted_by, aborted_on = set(), set()
2203 for queue_entry in self.queue_entries:
2204 if queue_entry.aborted_by:
2205 aborted_by.add(queue_entry.aborted_by)
2206 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2207 aborted_on.add(t)
2208
2209 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002210 # TODO(showard): this conditional is now obsolete, we just need to leave
2211 # it in temporarily for backwards compatibility over upgrades. delete
2212 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002213 assert len(aborted_by) <= 1
2214 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002215 aborted_by_value = aborted_by.pop()
2216 aborted_on_value = max(aborted_on)
2217 else:
2218 aborted_by_value = 'autotest_system'
2219 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002220
showarda0382352009-02-11 23:36:43 +00002221 self._write_keyval_after_job("aborted_by", aborted_by_value)
2222 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002223
showardcbd74612008-11-19 21:42:02 +00002224 aborted_on_string = str(datetime.datetime.fromtimestamp(
2225 aborted_on_value))
2226 self._write_status_comment('Job aborted by %s on %s' %
2227 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002228
2229
jadmanski0afbb632008-06-06 21:10:57 +00002230 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002231 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002232 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002233 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002234
2235
jadmanski0afbb632008-06-06 21:10:57 +00002236 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002237 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002238 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002239
2240
2241class QueueTask(AbstractQueueTask):
2242 def __init__(self, queue_entries):
2243 super(QueueTask, self).__init__(queue_entries)
2244 self._set_ids(queue_entries=queue_entries)
2245
2246
2247 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002248 self._check_queue_entry_statuses(
2249 self.queue_entries,
2250 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2251 models.HostQueueEntry.Status.RUNNING),
2252 allowed_host_statuses=(models.Host.Status.PENDING,
2253 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002254
2255 super(QueueTask, self).prolog()
2256
2257 for queue_entry in self.queue_entries:
2258 self._write_host_keyvals(queue_entry.host)
2259 queue_entry.host.set_status(models.Host.Status.RUNNING)
2260 queue_entry.host.update_field('dirty', 1)
2261 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2262 # TODO(gps): Remove this if nothing needs it anymore.
2263 # A potential user is: tko/parser
2264 self.job.write_to_machines_file(self.queue_entries[0])
2265
2266
2267 def _finish_task(self):
2268 super(QueueTask, self)._finish_task()
2269
2270 for queue_entry in self.queue_entries:
2271 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002272 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002273
2274
mbligh4608b002010-01-05 18:22:35 +00002275class HostlessQueueTask(AbstractQueueTask):
2276 def __init__(self, queue_entry):
2277 super(HostlessQueueTask, self).__init__([queue_entry])
2278 self.queue_entry_ids = [queue_entry.id]
2279
2280
2281 def prolog(self):
2282 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2283 super(HostlessQueueTask, self).prolog()
2284
2285
mbligh4608b002010-01-05 18:22:35 +00002286 def _finish_task(self):
2287 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002288 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002289
2290
showardd3dc1992009-04-22 21:01:40 +00002291class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002292 def __init__(self, queue_entries, log_file_name):
2293 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002294
showardd1195652009-12-08 22:21:02 +00002295 self.queue_entries = queue_entries
2296
showardd3dc1992009-04-22 21:01:40 +00002297 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002298 self._autoserv_monitor.attach_to_existing_process(
2299 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002300
showardd1195652009-12-08 22:21:02 +00002301
2302 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002303 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002304 return 'true'
2305 return self._generate_command(
2306 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002307
2308
2309 def _generate_command(self, results_dir):
2310 raise NotImplementedError('Subclasses must override this')
2311
2312
showardd1195652009-12-08 22:21:02 +00002313 @property
2314 def owner_username(self):
2315 return self.queue_entries[0].job.owner
2316
2317
2318 def _working_directory(self):
2319 return self._get_consistent_execution_path(self.queue_entries)
2320
2321
2322 def _paired_with_monitor(self):
2323 return self._autoserv_monitor
2324
2325
showardd3dc1992009-04-22 21:01:40 +00002326 def _job_was_aborted(self):
2327 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002328 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002329 queue_entry.update_from_database()
2330 if was_aborted is None: # first queue entry
2331 was_aborted = bool(queue_entry.aborted)
2332 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002333 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2334 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002335 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002336 'Inconsistent abort state',
2337 'Queue entries have inconsistent abort state:\n' +
2338 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002339 # don't crash here, just assume true
2340 return True
2341 return was_aborted
2342
2343
showardd1195652009-12-08 22:21:02 +00002344 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002345 if self._job_was_aborted():
2346 return models.HostQueueEntry.Status.ABORTED
2347
2348 # we'll use a PidfileRunMonitor to read the autoserv exit status
2349 if self._autoserv_monitor.exit_code() == 0:
2350 return models.HostQueueEntry.Status.COMPLETED
2351 return models.HostQueueEntry.Status.FAILED
2352
2353
showardd3dc1992009-04-22 21:01:40 +00002354 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002355 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002356 queue_entry.set_status(status)
2357
2358
2359 def abort(self):
2360 # override AgentTask.abort() to avoid killing the process and ending
2361 # the task. post-job tasks continue when the job is aborted.
2362 pass
2363
2364
mbligh4608b002010-01-05 18:22:35 +00002365 def _pidfile_label(self):
2366 # '.autoserv_execute' -> 'autoserv'
2367 return self._pidfile_name()[1:-len('_execute')]
2368
2369
showard9bb960b2009-11-19 01:02:11 +00002370class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002371 """
2372 Task responsible for
2373 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2374 * copying logs to the results repository
2375 * spawning CleanupTasks for hosts, if necessary
2376 * spawning a FinalReparseTask for the job
2377 """
showardd1195652009-12-08 22:21:02 +00002378 def __init__(self, queue_entries, recover_run_monitor=None):
2379 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002380 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002381 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002382 self._set_ids(queue_entries=queue_entries)
2383
2384
2385 def _generate_command(self, results_dir):
2386 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002387 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002388 return [_autoserv_path , '-p',
2389 '--pidfile-label=%s' % self._pidfile_label(),
2390 '--use-existing-results', '--collect-crashinfo',
2391 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002392
2393
showardd1195652009-12-08 22:21:02 +00002394 @property
2395 def num_processes(self):
2396 return len(self.queue_entries)
2397
2398
2399 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002400 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002401
2402
showardd3dc1992009-04-22 21:01:40 +00002403 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002404 self._check_queue_entry_statuses(
2405 self.queue_entries,
2406 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2407 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002408
showardd3dc1992009-04-22 21:01:40 +00002409 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002410
2411
showardd3dc1992009-04-22 21:01:40 +00002412 def epilog(self):
2413 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002414 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002415 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002416
showard9bb960b2009-11-19 01:02:11 +00002417
2418 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002419 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002420 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002421 models.HostQueueEntry.Status.COMPLETED)
2422 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2423 else:
2424 final_success = False
2425 num_tests_failed = 0
2426
showard9bb960b2009-11-19 01:02:11 +00002427 reboot_after = self._job.reboot_after
2428 do_reboot = (
2429 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002430 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002431 or reboot_after == model_attributes.RebootAfter.ALWAYS
2432 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002433 and final_success and num_tests_failed == 0))
2434
showardd1195652009-12-08 22:21:02 +00002435 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002436 if do_reboot:
2437 # don't pass the queue entry to the CleanupTask. if the cleanup
2438 # fails, the job doesn't care -- it's over.
2439 models.SpecialTask.objects.create(
2440 host=models.Host.objects.get(id=queue_entry.host.id),
2441 task=models.SpecialTask.Task.CLEANUP,
2442 requested_by=self._job.owner_model())
2443 else:
2444 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002445
2446
showard0bbfc212009-04-29 21:06:13 +00002447 def run(self):
showard597bfd32009-05-08 18:22:50 +00002448 autoserv_exit_code = self._autoserv_monitor.exit_code()
2449 # only run if Autoserv exited due to some signal. if we have no exit
2450 # code, assume something bad (and signal-like) happened.
2451 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002452 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002453 else:
2454 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002455
2456
mbligh4608b002010-01-05 18:22:35 +00002457class SelfThrottledPostJobTask(PostJobTask):
2458 """
2459 Special AgentTask subclass that maintains its own global process limit.
2460 """
2461 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002462
2463
mbligh4608b002010-01-05 18:22:35 +00002464 @classmethod
2465 def _increment_running_processes(cls):
2466 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002467
mblighd5c95802008-03-05 00:33:46 +00002468
mbligh4608b002010-01-05 18:22:35 +00002469 @classmethod
2470 def _decrement_running_processes(cls):
2471 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002472
2473
mbligh4608b002010-01-05 18:22:35 +00002474 @classmethod
2475 def _max_processes(cls):
2476 raise NotImplementedError
2477
2478
2479 @classmethod
2480 def _can_run_new_process(cls):
2481 return cls._num_running_processes < cls._max_processes()
2482
2483
2484 def _process_started(self):
2485 return bool(self.monitor)
2486
2487
2488 def tick(self):
2489 # override tick to keep trying to start until the process count goes
2490 # down and we can, at which point we revert to default behavior
2491 if self._process_started():
2492 super(SelfThrottledPostJobTask, self).tick()
2493 else:
2494 self._try_starting_process()
2495
2496
2497 def run(self):
2498 # override run() to not actually run unless we can
2499 self._try_starting_process()
2500
2501
2502 def _try_starting_process(self):
2503 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002504 return
2505
mbligh4608b002010-01-05 18:22:35 +00002506 # actually run the command
2507 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002508 if self._process_started():
2509 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002510
mblighd5c95802008-03-05 00:33:46 +00002511
mbligh4608b002010-01-05 18:22:35 +00002512 def finished(self, success):
2513 super(SelfThrottledPostJobTask, self).finished(success)
2514 if self._process_started():
2515 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002516
showard21baa452008-10-21 00:08:39 +00002517
mbligh4608b002010-01-05 18:22:35 +00002518class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002519 def __init__(self, queue_entries):
2520 super(FinalReparseTask, self).__init__(queue_entries,
2521 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002522 # don't use _set_ids, since we don't want to set the host_ids
2523 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002524
2525
2526 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002527 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002528 results_dir]
2529
2530
2531 @property
2532 def num_processes(self):
2533 return 0 # don't include parser processes in accounting
2534
2535
2536 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002537 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002538
2539
showard97aed502008-11-04 02:01:24 +00002540 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002541 def _max_processes(cls):
2542 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002543
2544
2545 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002546 self._check_queue_entry_statuses(
2547 self.queue_entries,
2548 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002549
showard97aed502008-11-04 02:01:24 +00002550 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002551
2552
2553 def epilog(self):
2554 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002555 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002556
2557
mbligh4608b002010-01-05 18:22:35 +00002558class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002559 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2560
mbligh4608b002010-01-05 18:22:35 +00002561 def __init__(self, queue_entries):
2562 super(ArchiveResultsTask, self).__init__(queue_entries,
2563 log_file_name='.archiving.log')
2564 # don't use _set_ids, since we don't want to set the host_ids
2565 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002566
2567
mbligh4608b002010-01-05 18:22:35 +00002568 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002569 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002570
2571
mbligh4608b002010-01-05 18:22:35 +00002572 def _generate_command(self, results_dir):
2573 return [_autoserv_path , '-p',
2574 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002575 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002576 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2577 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002578
2579
mbligh4608b002010-01-05 18:22:35 +00002580 @classmethod
2581 def _max_processes(cls):
2582 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002583
2584
2585 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002586 self._check_queue_entry_statuses(
2587 self.queue_entries,
2588 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2589
2590 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002591
2592
mbligh4608b002010-01-05 18:22:35 +00002593 def epilog(self):
2594 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002595 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002596 failed_file = os.path.join(self._working_directory(),
2597 self._ARCHIVING_FAILED_FILE)
2598 paired_process = self._paired_with_monitor().get_process()
2599 _drone_manager.write_lines_to_file(
2600 failed_file, ['Archiving failed with exit code %s'
2601 % self.monitor.exit_code()],
2602 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002603 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002604
2605
mbligh36768f02008-02-22 18:28:33 +00002606if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002607 main()