blob: 7ef0db95c87065950b0a9550226eb5a0113faa54 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000056_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000057_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000058
Eric Lie0493a42010-11-15 13:05:43 -080059def _parser_path_default(install_dir):
60 return os.path.join(install_dir, 'tko', 'parse')
61_parser_path_func = utils.import_site_function(
62 __file__, 'autotest_lib.scheduler.site_monitor_db',
63 'parser_path', _parser_path_default)
64_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
65
mbligh36768f02008-02-22 18:28:33 +000066
showardec6a3b92009-09-25 20:29:13 +000067def _get_pidfile_timeout_secs():
68 """@returns How long to wait for autoserv to write pidfile."""
69 pidfile_timeout_mins = global_config.global_config.get_config_value(
70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
71 return pidfile_timeout_mins * 60
72
73
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesrenc44ae992010-02-19 00:12:54 +000078get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000079 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000080 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000081
82
jamesren76fcf192010-04-21 20:39:50 +000083def _verify_default_drone_set_exists():
84 if (models.DroneSet.drone_sets_enabled() and
85 not models.DroneSet.default_drone_set_name()):
86 raise SchedulerError('Drone sets are enabled, but no default is set')
87
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000108 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000109
showard136e6dc2009-06-10 19:38:49 +0000110 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser = optparse.OptionParser(usage)
112 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
113 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser.add_option('--test', help='Indicate that scheduler is under ' +
115 'test and should use dummy autoserv and no parsing',
116 action='store_true')
117 (options, args) = parser.parse_args()
118 if len(args) != 1:
119 parser.print_usage()
120 return
mbligh36768f02008-02-22 18:28:33 +0000121
showard5613c662009-06-08 23:30:33 +0000122 scheduler_enabled = global_config.global_config.get_config_value(
123 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
124
125 if not scheduler_enabled:
126 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
127 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000128 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000129 sys.exit(1)
130
jadmanski0afbb632008-06-06 21:10:57 +0000131 global RESULTS_DIR
132 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000133
mbligh83c1e9e2009-05-01 23:10:41 +0000134 site_init = utils.import_site_function(__file__,
135 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
136 _site_init_monitor_db_dummy)
137 site_init()
138
showardcca334f2009-03-12 20:38:34 +0000139 # Change the cwd while running to avoid issues incase we were launched from
140 # somewhere odd (such as a random NFS home directory of the person running
141 # sudo to launch us as the appropriate user).
142 os.chdir(RESULTS_DIR)
143
jamesrenc7d387e2010-08-10 21:48:30 +0000144 # This is helpful for debugging why stuff a scheduler launches is
145 # misbehaving.
146 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000147
jadmanski0afbb632008-06-06 21:10:57 +0000148 if options.test:
149 global _autoserv_path
150 _autoserv_path = 'autoserv_dummy'
151 global _testing_mode
152 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000153
jamesrenc44ae992010-02-19 00:12:54 +0000154 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000155 server.start()
156
jadmanski0afbb632008-06-06 21:10:57 +0000157 try:
jamesrenc44ae992010-02-19 00:12:54 +0000158 initialize()
showardc5afc462009-01-13 00:09:39 +0000159 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000160 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000161
jadmanski0afbb632008-06-06 21:10:57 +0000162 while not _shutdown:
163 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000164 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000165 except:
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.log_stacktrace(
167 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000168
showard170873e2009-01-07 00:22:26 +0000169 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000170 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000171 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000172 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000173
174
showard136e6dc2009-06-10 19:38:49 +0000175def setup_logging():
176 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
177 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
178 logging_manager.configure_logging(
179 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
180 logfile_name=log_name)
181
182
mbligh36768f02008-02-22 18:28:33 +0000183def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000184 global _shutdown
185 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000186 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000187
188
jamesrenc44ae992010-02-19 00:12:54 +0000189def initialize():
showardb18134f2009-03-20 20:52:18 +0000190 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
191 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000192
showard8de37132009-08-31 18:33:08 +0000193 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000194 logging.critical("monitor_db already running, aborting!")
195 sys.exit(1)
196 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000197
showardb1e51872008-10-07 11:08:18 +0000198 if _testing_mode:
199 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000200 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
203 global _db
showard170873e2009-01-07 00:22:26 +0000204 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000205 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000206
showardfa8629c2008-11-04 16:51:23 +0000207 # ensure Django connection is in autocommit
208 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000209 # bypass the readonly connection
210 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000213 signal.signal(signal.SIGINT, handle_sigint)
214
jamesrenc44ae992010-02-19 00:12:54 +0000215 initialize_globals()
216 scheduler_models.initialize()
217
showardd1ee1dd2009-01-07 21:33:08 +0000218 drones = global_config.global_config.get_config_value(
219 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
220 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000221 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000222 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000223 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
224
showardb18134f2009-03-20 20:52:18 +0000225 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000226
227
jamesrenc44ae992010-02-19 00:12:54 +0000228def initialize_globals():
229 global _drone_manager
230 _drone_manager = drone_manager.instance()
231
232
showarded2afea2009-07-07 20:54:07 +0000233def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
234 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000235 """
236 @returns The autoserv command line as a list of executable + parameters.
237
238 @param machines - string - A machine or comma separated list of machines
239 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000240 @param extra_args - list - Additional arguments to pass to autoserv.
241 @param job - Job object - If supplied, -u owner and -l name parameters
242 will be added.
243 @param queue_entry - A HostQueueEntry object - If supplied and no Job
244 object was supplied, this will be used to lookup the Job object.
245 """
showarda9545c02009-12-18 22:44:26 +0000246 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000247 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000248 if machines:
249 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000250 if job or queue_entry:
251 if not job:
252 job = queue_entry.job
253 autoserv_argv += ['-u', job.owner, '-l', job.name]
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800254 if job.is_image_update_job():
255 autoserv_argv += ['--image', job.update_image_path]
showarde9c69362009-06-30 01:58:03 +0000256 if verbose:
257 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000258 return autoserv_argv + extra_args
259
260
showard89f84db2009-03-12 20:39:13 +0000261class SchedulerError(Exception):
262 """Raised by HostScheduler when an inconsistent state occurs."""
263
264
jamesren883492a2010-02-12 00:45:18 +0000265class HostScheduler(metahost_scheduler.HostSchedulingUtility):
266 """Handles the logic for choosing when to run jobs and on which hosts.
267
268 This class makes several queries to the database on each tick, building up
269 some auxiliary data structures and using them to determine which hosts are
270 eligible to run which jobs, taking into account all the various factors that
271 affect that.
272
273 In the past this was done with one or two very large, complex database
274 queries. It has proven much simpler and faster to build these auxiliary
275 data structures and perform the logic in Python.
276 """
277 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000278 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
279
280 # load site-specific scheduler selected in global_config
281 site_schedulers_str = global_config.global_config.get_config_value(
282 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
283 default='')
284 site_schedulers = set(site_schedulers_str.split(','))
285 for scheduler in get_site_metahost_schedulers():
286 if type(scheduler).__name__ in site_schedulers:
287 # always prepend, so site schedulers take precedence
288 self._metahost_schedulers = (
289 [scheduler] + self._metahost_schedulers)
290 logging.info('Metahost schedulers: %s',
291 ', '.join(type(scheduler).__name__ for scheduler
292 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000293
294
showard63a34772008-08-18 19:32:50 +0000295 def _get_ready_hosts(self):
296 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000297 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000298 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
299 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000300 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000301 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000302 "AND NOT afe_hosts.locked "
303 "AND (afe_hosts.status IS NULL "
304 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000305 return dict((host.id, host) for host in hosts)
306
307
308 @staticmethod
309 def _get_sql_id_list(id_list):
310 return ','.join(str(item_id) for item_id in id_list)
311
312
313 @classmethod
showard989f25d2008-10-01 11:38:11 +0000314 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000315 if not id_list:
316 return {}
showard63a34772008-08-18 19:32:50 +0000317 query %= cls._get_sql_id_list(id_list)
318 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000319 return cls._process_many2many_dict(rows, flip)
320
321
322 @staticmethod
323 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000324 result = {}
325 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000326 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000327 if flip:
328 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000329 result.setdefault(left_id, set()).add(right_id)
330 return result
331
332
333 @classmethod
334 def _get_job_acl_groups(cls, job_ids):
335 query = """
showardeab66ce2009-12-23 00:03:56 +0000336 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
337 FROM afe_jobs
338 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
339 INNER JOIN afe_acl_groups_users ON
340 afe_acl_groups_users.user_id = afe_users.id
341 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000342 """
343 return cls._get_many2many_dict(query, job_ids)
344
345
346 @classmethod
347 def _get_job_ineligible_hosts(cls, job_ids):
348 query = """
349 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000350 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000351 WHERE job_id IN (%s)
352 """
353 return cls._get_many2many_dict(query, job_ids)
354
355
356 @classmethod
showard989f25d2008-10-01 11:38:11 +0000357 def _get_job_dependencies(cls, job_ids):
358 query = """
359 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000360 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000361 WHERE job_id IN (%s)
362 """
363 return cls._get_many2many_dict(query, job_ids)
364
365
366 @classmethod
showard63a34772008-08-18 19:32:50 +0000367 def _get_host_acls(cls, host_ids):
368 query = """
showardd9ac4452009-02-07 02:04:37 +0000369 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000370 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000371 WHERE host_id IN (%s)
372 """
373 return cls._get_many2many_dict(query, host_ids)
374
375
376 @classmethod
377 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000378 if not host_ids:
379 return {}, {}
showard63a34772008-08-18 19:32:50 +0000380 query = """
381 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000382 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000383 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000384 """ % cls._get_sql_id_list(host_ids)
385 rows = _db.execute(query)
386 labels_to_hosts = cls._process_many2many_dict(rows)
387 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
388 return labels_to_hosts, hosts_to_labels
389
390
391 @classmethod
392 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000393 return dict((label.id, label) for label
394 in scheduler_models.Label.fetch())
395
396
397 def recovery_on_startup(self):
398 for metahost_scheduler in self._metahost_schedulers:
399 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000400
401
402 def refresh(self, pending_queue_entries):
403 self._hosts_available = self._get_ready_hosts()
404
405 relevant_jobs = [queue_entry.job_id
406 for queue_entry in pending_queue_entries]
407 self._job_acls = self._get_job_acl_groups(relevant_jobs)
408 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000409 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000410
411 host_ids = self._hosts_available.keys()
412 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000413 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
414
415 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000416
jamesrene21bf412010-02-26 02:30:07 +0000417
418 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000419 for metahost_scheduler in self._metahost_schedulers:
420 metahost_scheduler.tick()
421
showard63a34772008-08-18 19:32:50 +0000422
jamesren883492a2010-02-12 00:45:18 +0000423 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000424 return set(self._label_hosts.get(label_id, ()))
425
426
427 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000428 self._label_hosts[label_id].remove(host_id)
429
430
431 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000432 return self._hosts_available.pop(host_id)
433
434
435 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000436 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
437
438
showard63a34772008-08-18 19:32:50 +0000439 def _is_acl_accessible(self, host_id, queue_entry):
440 job_acls = self._job_acls.get(queue_entry.job_id, set())
441 host_acls = self._host_acls.get(host_id, set())
442 return len(host_acls.intersection(job_acls)) > 0
443
444
showard989f25d2008-10-01 11:38:11 +0000445 def _check_job_dependencies(self, job_dependencies, host_labels):
446 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000447 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000448
449
450 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
451 queue_entry):
showardade14e22009-01-26 22:38:32 +0000452 if not queue_entry.meta_host:
453 # bypass only_if_needed labels when a specific host is selected
454 return True
455
showard989f25d2008-10-01 11:38:11 +0000456 for label_id in host_labels:
457 label = self._labels[label_id]
458 if not label.only_if_needed:
459 # we don't care about non-only_if_needed labels
460 continue
461 if queue_entry.meta_host == label_id:
462 # if the label was requested in a metahost it's OK
463 continue
464 if label_id not in job_dependencies:
465 return False
466 return True
467
468
showard89f84db2009-03-12 20:39:13 +0000469 def _check_atomic_group_labels(self, host_labels, queue_entry):
470 """
471 Determine if the given HostQueueEntry's atomic group settings are okay
472 to schedule on a host with the given labels.
473
showard6157c632009-07-06 20:19:31 +0000474 @param host_labels: A list of label ids that the host has.
475 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000476
477 @returns True if atomic group settings are okay, False otherwise.
478 """
showard6157c632009-07-06 20:19:31 +0000479 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000480 queue_entry.atomic_group_id)
481
482
showard6157c632009-07-06 20:19:31 +0000483 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000484 """
485 Return the atomic group label id for a host with the given set of
486 labels if any, or None otherwise. Raises an exception if more than
487 one atomic group are found in the set of labels.
488
showard6157c632009-07-06 20:19:31 +0000489 @param host_labels: A list of label ids that the host has.
490 @param queue_entry: The HostQueueEntry we're testing. Only used for
491 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000492
493 @returns The id of the atomic group found on a label in host_labels
494 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000495 """
showard6157c632009-07-06 20:19:31 +0000496 atomic_labels = [self._labels[label_id] for label_id in host_labels
497 if self._labels[label_id].atomic_group_id is not None]
498 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000499 if not atomic_ids:
500 return None
501 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000502 logging.error('More than one Atomic Group on HQE "%s" via: %r',
503 queue_entry, atomic_labels)
504 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000505
506
507 def _get_atomic_group_labels(self, atomic_group_id):
508 """
509 Lookup the label ids that an atomic_group is associated with.
510
511 @param atomic_group_id - The id of the AtomicGroup to look up.
512
513 @returns A generator yeilding Label ids for this atomic group.
514 """
515 return (id for id, label in self._labels.iteritems()
516 if label.atomic_group_id == atomic_group_id
517 and not label.invalid)
518
519
showard54c1ea92009-05-20 00:32:58 +0000520 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000521 """
522 @param group_hosts - A sequence of Host ids to test for usability
523 and eligibility against the Job associated with queue_entry.
524 @param queue_entry - The HostQueueEntry that these hosts are being
525 tested for eligibility against.
526
527 @returns A subset of group_hosts Host ids that are eligible for the
528 supplied queue_entry.
529 """
530 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000531 if self.is_host_usable(host_id)
532 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000533
534
jamesren883492a2010-02-12 00:45:18 +0000535 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000536 if self._is_host_invalid(host_id):
537 # if an invalid host is scheduled for a job, it's a one-time host
538 # and it therefore bypasses eligibility checks. note this can only
539 # happen for non-metahosts, because invalid hosts have their label
540 # relationships cleared.
541 return True
542
showard989f25d2008-10-01 11:38:11 +0000543 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
544 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000545
showard89f84db2009-03-12 20:39:13 +0000546 return (self._is_acl_accessible(host_id, queue_entry) and
547 self._check_job_dependencies(job_dependencies, host_labels) and
548 self._check_only_if_needed_labels(
549 job_dependencies, host_labels, queue_entry) and
550 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000551
552
showard2924b0a2009-06-18 23:16:15 +0000553 def _is_host_invalid(self, host_id):
554 host_object = self._hosts_available.get(host_id, None)
555 return host_object and host_object.invalid
556
557
showard63a34772008-08-18 19:32:50 +0000558 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000559 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000560 return None
561 return self._hosts_available.pop(queue_entry.host_id, None)
562
563
jamesren883492a2010-02-12 00:45:18 +0000564 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000565 if host_id not in self._hosts_available:
566 # host was already used during this scheduling cycle
567 return False
568 if self._hosts_available[host_id].invalid:
569 # Invalid hosts cannot be used for metahosts. They're included in
570 # the original query because they can be used by non-metahosts.
571 return False
572 return True
573
574
jamesren883492a2010-02-12 00:45:18 +0000575 def schedule_entry(self, queue_entry):
576 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000577 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000578
579 for scheduler in self._metahost_schedulers:
580 if scheduler.can_schedule_metahost(queue_entry):
581 scheduler.schedule_metahost(queue_entry, self)
582 return None
583
584 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000585
586
showard89f84db2009-03-12 20:39:13 +0000587 def find_eligible_atomic_group(self, queue_entry):
588 """
589 Given an atomic group host queue entry, locate an appropriate group
590 of hosts for the associated job to run on.
591
592 The caller is responsible for creating new HQEs for the additional
593 hosts returned in order to run the actual job on them.
594
595 @returns A list of Host instances in a ready state to satisfy this
596 atomic group scheduling. Hosts will all belong to the same
597 atomic group label as specified by the queue_entry.
598 An empty list will be returned if no suitable atomic
599 group could be found.
600
601 TODO(gps): what is responsible for kicking off any attempted repairs on
602 a group of hosts? not this function, but something needs to. We do
603 not communicate that reason for returning [] outside of here...
604 For now, we'll just be unschedulable if enough hosts within one group
605 enter Repair Failed state.
606 """
607 assert queue_entry.atomic_group_id is not None
608 job = queue_entry.job
609 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000610 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000611 if job.synch_count > atomic_group.max_number_of_machines:
612 # Such a Job and HostQueueEntry should never be possible to
613 # create using the frontend. Regardless, we can't process it.
614 # Abort it immediately and log an error on the scheduler.
615 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000616 logging.error(
617 'Error: job %d synch_count=%d > requested atomic_group %d '
618 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
619 job.id, job.synch_count, atomic_group.id,
620 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000621 return []
jamesren883492a2010-02-12 00:45:18 +0000622 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
623 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000624
625 # Look in each label associated with atomic_group until we find one with
626 # enough hosts to satisfy the job.
627 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000628 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000629 if queue_entry.meta_host is not None:
630 # If we have a metahost label, only allow its hosts.
631 group_hosts.intersection_update(hosts_in_label)
632 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000633 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000634 group_hosts, queue_entry)
635
636 # Job.synch_count is treated as "minimum synch count" when
637 # scheduling for an atomic group of hosts. The atomic group
638 # number of machines is the maximum to pick out of a single
639 # atomic group label for scheduling at one time.
640 min_hosts = job.synch_count
641 max_hosts = atomic_group.max_number_of_machines
642
showard54c1ea92009-05-20 00:32:58 +0000643 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000644 # Not enough eligible hosts in this atomic group label.
645 continue
646
showard54c1ea92009-05-20 00:32:58 +0000647 eligible_hosts_in_group = [self._hosts_available[id]
648 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000649 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000650 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000651
showard89f84db2009-03-12 20:39:13 +0000652 # Limit ourselves to scheduling the atomic group size.
653 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000654 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000655
656 # Remove the selected hosts from our cached internal state
657 # of available hosts in order to return the Host objects.
658 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000659 for host in eligible_hosts_in_group:
660 hosts_in_label.discard(host.id)
661 self._hosts_available.pop(host.id)
662 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000663 return host_list
664
665 return []
666
667
showard170873e2009-01-07 00:22:26 +0000668class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000669 def __init__(self):
670 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000671 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000672 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000673 user_cleanup_time = scheduler_config.config.clean_interval
674 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
675 _db, user_cleanup_time)
676 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000677 self._host_agents = {}
678 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000679 self._tick_count = 0
680 self._last_garbage_stats_time = time.time()
681 self._seconds_between_garbage_stats = 60 * (
682 global_config.global_config.get_config_value(
683 scheduler_config.CONFIG_SECTION,
684 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000685
mbligh36768f02008-02-22 18:28:33 +0000686
showard915958d2009-04-22 21:00:58 +0000687 def initialize(self, recover_hosts=True):
688 self._periodic_cleanup.initialize()
689 self._24hr_upkeep.initialize()
690
jadmanski0afbb632008-06-06 21:10:57 +0000691 # always recover processes
692 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000693
jadmanski0afbb632008-06-06 21:10:57 +0000694 if recover_hosts:
695 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000696
jamesrenc44ae992010-02-19 00:12:54 +0000697 self._host_scheduler.recovery_on_startup()
698
mbligh36768f02008-02-22 18:28:33 +0000699
jadmanski0afbb632008-06-06 21:10:57 +0000700 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000701 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000702 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000703 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000704 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000705 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000706 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000707 self._schedule_running_host_queue_entries()
708 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000709 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000710 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000711 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000712 _drone_manager.execute_actions()
713 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000714 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000715 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000716
showard97aed502008-11-04 02:01:24 +0000717
mblighf3294cc2009-04-08 21:17:38 +0000718 def _run_cleanup(self):
719 self._periodic_cleanup.run_cleanup_maybe()
720 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000721
mbligh36768f02008-02-22 18:28:33 +0000722
showardf13a9e22009-12-18 22:54:09 +0000723 def _garbage_collection(self):
724 threshold_time = time.time() - self._seconds_between_garbage_stats
725 if threshold_time < self._last_garbage_stats_time:
726 # Don't generate these reports very often.
727 return
728
729 self._last_garbage_stats_time = time.time()
730 # Force a full level 0 collection (because we can, it doesn't hurt
731 # at this interval).
732 gc.collect()
733 logging.info('Logging garbage collector stats on tick %d.',
734 self._tick_count)
735 gc_stats._log_garbage_collector_stats()
736
737
showard170873e2009-01-07 00:22:26 +0000738 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
739 for object_id in object_ids:
740 agent_dict.setdefault(object_id, set()).add(agent)
741
742
743 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
744 for object_id in object_ids:
745 assert object_id in agent_dict
746 agent_dict[object_id].remove(agent)
747
748
showardd1195652009-12-08 22:21:02 +0000749 def add_agent_task(self, agent_task):
750 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000751 self._agents.append(agent)
752 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000753 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
754 self._register_agent_for_ids(self._queue_entry_agents,
755 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000756
showard170873e2009-01-07 00:22:26 +0000757
758 def get_agents_for_entry(self, queue_entry):
759 """
760 Find agents corresponding to the specified queue_entry.
761 """
showardd3dc1992009-04-22 21:01:40 +0000762 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000763
764
765 def host_has_agent(self, host):
766 """
767 Determine if there is currently an Agent present using this host.
768 """
769 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000770
771
jadmanski0afbb632008-06-06 21:10:57 +0000772 def remove_agent(self, agent):
773 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000774 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
775 agent)
776 self._unregister_agent_for_ids(self._queue_entry_agents,
777 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000778
779
showard8cc058f2009-09-08 16:26:33 +0000780 def _host_has_scheduled_special_task(self, host):
781 return bool(models.SpecialTask.objects.filter(host__id=host.id,
782 is_active=False,
783 is_complete=False))
784
785
jadmanski0afbb632008-06-06 21:10:57 +0000786 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000787 agent_tasks = self._create_recovery_agent_tasks()
788 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000789 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000790 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000791 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000792 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000793 self._reverify_remaining_hosts()
794 # reinitialize drones after killing orphaned processes, since they can
795 # leave around files when they die
796 _drone_manager.execute_actions()
797 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000798
showard170873e2009-01-07 00:22:26 +0000799
showardd1195652009-12-08 22:21:02 +0000800 def _create_recovery_agent_tasks(self):
801 return (self._get_queue_entry_agent_tasks()
802 + self._get_special_task_agent_tasks(is_active=True))
803
804
805 def _get_queue_entry_agent_tasks(self):
806 # host queue entry statuses handled directly by AgentTasks (Verifying is
807 # handled through SpecialTasks, so is not listed here)
808 statuses = (models.HostQueueEntry.Status.STARTING,
809 models.HostQueueEntry.Status.RUNNING,
810 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000811 models.HostQueueEntry.Status.PARSING,
812 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000813 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000814 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000815 where='status IN (%s)' % status_list)
816
817 agent_tasks = []
818 used_queue_entries = set()
819 for entry in queue_entries:
820 if self.get_agents_for_entry(entry):
821 # already being handled
822 continue
823 if entry in used_queue_entries:
824 # already picked up by a synchronous job
825 continue
826 agent_task = self._get_agent_task_for_queue_entry(entry)
827 agent_tasks.append(agent_task)
828 used_queue_entries.update(agent_task.queue_entries)
829 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000830
831
showardd1195652009-12-08 22:21:02 +0000832 def _get_special_task_agent_tasks(self, is_active=False):
833 special_tasks = models.SpecialTask.objects.filter(
834 is_active=is_active, is_complete=False)
835 return [self._get_agent_task_for_special_task(task)
836 for task in special_tasks]
837
838
839 def _get_agent_task_for_queue_entry(self, queue_entry):
840 """
841 Construct an AgentTask instance for the given active HostQueueEntry,
842 if one can currently run it.
843 @param queue_entry: a HostQueueEntry
844 @returns an AgentTask to run the queue entry
845 """
846 task_entries = queue_entry.job.get_group_entries(queue_entry)
847 self._check_for_duplicate_host_entries(task_entries)
848
849 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
850 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000851 if queue_entry.is_hostless():
852 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000853 return QueueTask(queue_entries=task_entries)
854 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
855 return GatherLogsTask(queue_entries=task_entries)
856 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
857 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000858 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
859 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000860
861 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000862 'invalid status %s: %s'
863 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000864
865
866 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000867 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
868 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000869 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000870 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000871 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000872 if using_host:
showardd1195652009-12-08 22:21:02 +0000873 self._assert_host_has_no_agent(task_entry)
874
875
876 def _assert_host_has_no_agent(self, entry):
877 """
878 @param entry: a HostQueueEntry or a SpecialTask
879 """
880 if self.host_has_agent(entry.host):
881 agent = tuple(self._host_agents.get(entry.host.id))[0]
882 raise SchedulerError(
883 'While scheduling %s, host %s already has a host agent %s'
884 % (entry, entry.host, agent.task))
885
886
887 def _get_agent_task_for_special_task(self, special_task):
888 """
889 Construct an AgentTask class to run the given SpecialTask and add it
890 to this dispatcher.
891 @param special_task: a models.SpecialTask instance
892 @returns an AgentTask to run this SpecialTask
893 """
894 self._assert_host_has_no_agent(special_task)
895
896 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
897 for agent_task_class in special_agent_task_classes:
898 if agent_task_class.TASK_TYPE == special_task.task:
899 return agent_task_class(task=special_task)
900
901 raise SchedulerError('No AgentTask class for task', str(special_task))
902
903
904 def _register_pidfiles(self, agent_tasks):
905 for agent_task in agent_tasks:
906 agent_task.register_necessary_pidfiles()
907
908
909 def _recover_tasks(self, agent_tasks):
910 orphans = _drone_manager.get_orphaned_autoserv_processes()
911
912 for agent_task in agent_tasks:
913 agent_task.recover()
914 if agent_task.monitor and agent_task.monitor.has_process():
915 orphans.discard(agent_task.monitor.get_process())
916 self.add_agent_task(agent_task)
917
918 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000919
920
showard8cc058f2009-09-08 16:26:33 +0000921 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000922 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
923 % status):
showard0db3d432009-10-12 20:29:15 +0000924 if entry.status == status and not self.get_agents_for_entry(entry):
925 # The status can change during iteration, e.g., if job.run()
926 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000927 yield entry
928
929
showard6878e8b2009-07-20 22:37:45 +0000930 def _check_for_remaining_orphan_processes(self, orphans):
931 if not orphans:
932 return
933 subject = 'Unrecovered orphan autoserv processes remain'
934 message = '\n'.join(str(process) for process in orphans)
935 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000936
937 die_on_orphans = global_config.global_config.get_config_value(
938 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
939
940 if die_on_orphans:
941 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000942
showard170873e2009-01-07 00:22:26 +0000943
showard8cc058f2009-09-08 16:26:33 +0000944 def _recover_pending_entries(self):
945 for entry in self._get_unassigned_entries(
946 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000947 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000948 entry.on_pending()
949
950
showardb8900452009-10-12 20:31:01 +0000951 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000952 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000953 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
954 unrecovered_hqes = []
955 for queue_entry in queue_entries:
956 special_tasks = models.SpecialTask.objects.filter(
957 task__in=(models.SpecialTask.Task.CLEANUP,
958 models.SpecialTask.Task.VERIFY),
959 queue_entry__id=queue_entry.id,
960 is_complete=False)
961 if special_tasks.count() == 0:
962 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000963
showardb8900452009-10-12 20:31:01 +0000964 if unrecovered_hqes:
965 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000966 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000967 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000968 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000969
970
showard65db3932009-10-28 19:54:35 +0000971 def _get_prioritized_special_tasks(self):
972 """
973 Returns all queued SpecialTasks prioritized for repair first, then
974 cleanup, then verify.
975 """
976 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
977 is_complete=False,
978 host__locked=False)
979 # exclude hosts with active queue entries unless the SpecialTask is for
980 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000981 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000982 queued_tasks, 'afe_host_queue_entries', 'host_id',
983 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000984 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000985 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000986 where=['(afe_host_queue_entries.id IS NULL OR '
987 'afe_host_queue_entries.id = '
988 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000989
showard65db3932009-10-28 19:54:35 +0000990 # reorder tasks by priority
991 task_priority_order = [models.SpecialTask.Task.REPAIR,
992 models.SpecialTask.Task.CLEANUP,
993 models.SpecialTask.Task.VERIFY]
994 def task_priority_key(task):
995 return task_priority_order.index(task.task)
996 return sorted(queued_tasks, key=task_priority_key)
997
998
showard65db3932009-10-28 19:54:35 +0000999 def _schedule_special_tasks(self):
1000 """
1001 Execute queued SpecialTasks that are ready to run on idle hosts.
1002 """
1003 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +00001004 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +00001005 continue
showardd1195652009-12-08 22:21:02 +00001006 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +00001007
1008
showard170873e2009-01-07 00:22:26 +00001009 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +00001010 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001011 # should never happen
showarded2afea2009-07-07 20:54:07 +00001012 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001013 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001014 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001015 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001016 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001017
1018
jadmanski0afbb632008-06-06 21:10:57 +00001019 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001020 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001021 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001022 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001023 if self.host_has_agent(host):
1024 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001025 continue
showard8cc058f2009-09-08 16:26:33 +00001026 if self._host_has_scheduled_special_task(host):
1027 # host will have a special task scheduled on the next cycle
1028 continue
showard170873e2009-01-07 00:22:26 +00001029 if print_message:
showardb18134f2009-03-20 20:52:18 +00001030 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001031 models.SpecialTask.objects.create(
1032 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001033 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001034
1035
jadmanski0afbb632008-06-06 21:10:57 +00001036 def _recover_hosts(self):
1037 # recover "Repair Failed" hosts
1038 message = 'Reverifying dead host %s'
1039 self._reverify_hosts_where("status = 'Repair Failed'",
1040 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001041
1042
showard04c82c52008-05-29 19:38:12 +00001043
showardb95b1bd2008-08-15 18:11:04 +00001044 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001045 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001046 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001047 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001048 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001049 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001050
1051
showard89f84db2009-03-12 20:39:13 +00001052 def _refresh_pending_queue_entries(self):
1053 """
1054 Lookup the pending HostQueueEntries and call our HostScheduler
1055 refresh() method given that list. Return the list.
1056
1057 @returns A list of pending HostQueueEntries sorted in priority order.
1058 """
showard63a34772008-08-18 19:32:50 +00001059 queue_entries = self._get_pending_queue_entries()
1060 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001061 return []
showardb95b1bd2008-08-15 18:11:04 +00001062
showard63a34772008-08-18 19:32:50 +00001063 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001064
showard89f84db2009-03-12 20:39:13 +00001065 return queue_entries
1066
1067
1068 def _schedule_atomic_group(self, queue_entry):
1069 """
1070 Schedule the given queue_entry on an atomic group of hosts.
1071
1072 Returns immediately if there are insufficient available hosts.
1073
1074 Creates new HostQueueEntries based off of queue_entry for the
1075 scheduled hosts and starts them all running.
1076 """
1077 # This is a virtual host queue entry representing an entire
1078 # atomic group, find a group and schedule their hosts.
1079 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1080 queue_entry)
1081 if not group_hosts:
1082 return
showardcbe6f942009-06-17 19:33:49 +00001083
1084 logging.info('Expanding atomic group entry %s with hosts %s',
1085 queue_entry,
1086 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001087
showard89f84db2009-03-12 20:39:13 +00001088 for assigned_host in group_hosts[1:]:
1089 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001090 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001091 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001092 new_hqe.set_host(assigned_host)
1093 self._run_queue_entry(new_hqe)
1094
1095 # The first assigned host uses the original HostQueueEntry
1096 queue_entry.set_host(group_hosts[0])
1097 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001098
1099
showarda9545c02009-12-18 22:44:26 +00001100 def _schedule_hostless_job(self, queue_entry):
1101 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001102 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001103
1104
showard89f84db2009-03-12 20:39:13 +00001105 def _schedule_new_jobs(self):
1106 queue_entries = self._refresh_pending_queue_entries()
1107 if not queue_entries:
1108 return
1109
showard63a34772008-08-18 19:32:50 +00001110 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001111 is_unassigned_atomic_group = (
1112 queue_entry.atomic_group_id is not None
1113 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001114
1115 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001116 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001117 elif is_unassigned_atomic_group:
1118 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001119 else:
jamesren883492a2010-02-12 00:45:18 +00001120 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001121 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001122 assert assigned_host.id == queue_entry.host_id
1123 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001124
1125
showard8cc058f2009-09-08 16:26:33 +00001126 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001127 for agent_task in self._get_queue_entry_agent_tasks():
1128 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001129
1130
1131 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001132 for entry in scheduler_models.HostQueueEntry.fetch(
1133 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001134 task = entry.job.schedule_delayed_callback_task(entry)
1135 if task:
showardd1195652009-12-08 22:21:02 +00001136 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001137
1138
jamesren883492a2010-02-12 00:45:18 +00001139 def _run_queue_entry(self, queue_entry):
1140 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001141
1142
jadmanski0afbb632008-06-06 21:10:57 +00001143 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +00001144 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +00001145 for entry in scheduler_models.HostQueueEntry.fetch(
1146 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001147 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001148 for agent in self.get_agents_for_entry(entry):
1149 agent.abort()
1150 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +00001151 jobs_to_stop.add(entry.job)
1152 for job in jobs_to_stop:
1153 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +00001154
1155
showard324bf812009-01-20 23:23:38 +00001156 def _can_start_agent(self, agent, num_started_this_cycle,
1157 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001158 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001159 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001160 return True
1161 # don't allow any nonzero-process agents to run after we've reached a
1162 # limit (this avoids starvation of many-process agents)
1163 if have_reached_limit:
1164 return False
1165 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001166 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001167 agent.task.owner_username,
1168 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001169 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001170 return False
1171 # if a single agent exceeds the per-cycle throttling, still allow it to
1172 # run when it's the first agent in the cycle
1173 if num_started_this_cycle == 0:
1174 return True
1175 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001176 if (num_started_this_cycle + agent.task.num_processes >
1177 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001178 return False
1179 return True
1180
1181
jadmanski0afbb632008-06-06 21:10:57 +00001182 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001183 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001184 have_reached_limit = False
1185 # iterate over copy, so we can remove agents during iteration
1186 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001187 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001188 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001189 have_reached_limit):
1190 have_reached_limit = True
1191 continue
showardd1195652009-12-08 22:21:02 +00001192 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001193 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001194 if agent.is_done():
1195 logging.info("agent finished")
1196 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001197 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001198 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001199
1200
showard29f7cd22009-04-29 21:16:24 +00001201 def _process_recurring_runs(self):
1202 recurring_runs = models.RecurringRun.objects.filter(
1203 start_date__lte=datetime.datetime.now())
1204 for rrun in recurring_runs:
1205 # Create job from template
1206 job = rrun.job
1207 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001208 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001209
1210 host_objects = info['hosts']
1211 one_time_hosts = info['one_time_hosts']
1212 metahost_objects = info['meta_hosts']
1213 dependencies = info['dependencies']
1214 atomic_group = info['atomic_group']
1215
1216 for host in one_time_hosts or []:
1217 this_host = models.Host.create_one_time_host(host.hostname)
1218 host_objects.append(this_host)
1219
1220 try:
1221 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001222 options=options,
showard29f7cd22009-04-29 21:16:24 +00001223 host_objects=host_objects,
1224 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001225 atomic_group=atomic_group)
1226
1227 except Exception, ex:
1228 logging.exception(ex)
1229 #TODO send email
1230
1231 if rrun.loop_count == 1:
1232 rrun.delete()
1233 else:
1234 if rrun.loop_count != 0: # if not infinite loop
1235 # calculate new start_date
1236 difference = datetime.timedelta(seconds=rrun.loop_period)
1237 rrun.start_date = rrun.start_date + difference
1238 rrun.loop_count -= 1
1239 rrun.save()
1240
1241
showard170873e2009-01-07 00:22:26 +00001242class PidfileRunMonitor(object):
1243 """
1244 Client must call either run() to start a new process or
1245 attach_to_existing_process().
1246 """
mbligh36768f02008-02-22 18:28:33 +00001247
showard170873e2009-01-07 00:22:26 +00001248 class _PidfileException(Exception):
1249 """
1250 Raised when there's some unexpected behavior with the pid file, but only
1251 used internally (never allowed to escape this class).
1252 """
mbligh36768f02008-02-22 18:28:33 +00001253
1254
showard170873e2009-01-07 00:22:26 +00001255 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001256 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001257 self._start_time = None
1258 self.pidfile_id = None
1259 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001260
1261
showard170873e2009-01-07 00:22:26 +00001262 def _add_nice_command(self, command, nice_level):
1263 if not nice_level:
1264 return command
1265 return ['nice', '-n', str(nice_level)] + command
1266
1267
1268 def _set_start_time(self):
1269 self._start_time = time.time()
1270
1271
showard418785b2009-11-23 20:19:59 +00001272 def run(self, command, working_directory, num_processes, nice_level=None,
1273 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001274 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001275 assert command is not None
1276 if nice_level is not None:
1277 command = ['nice', '-n', str(nice_level)] + command
1278 self._set_start_time()
1279 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001280 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001281 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001282 paired_with_pidfile=paired_with_pidfile, username=username,
1283 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001284
1285
showarded2afea2009-07-07 20:54:07 +00001286 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001287 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001288 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001289 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001290 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001291 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001292 if num_processes is not None:
1293 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001294
1295
jadmanski0afbb632008-06-06 21:10:57 +00001296 def kill(self):
showard170873e2009-01-07 00:22:26 +00001297 if self.has_process():
1298 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001299
mbligh36768f02008-02-22 18:28:33 +00001300
showard170873e2009-01-07 00:22:26 +00001301 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001302 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001303 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001304
1305
showard170873e2009-01-07 00:22:26 +00001306 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001307 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001308 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001309 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001310
1311
showard170873e2009-01-07 00:22:26 +00001312 def _read_pidfile(self, use_second_read=False):
1313 assert self.pidfile_id is not None, (
1314 'You must call run() or attach_to_existing_process()')
1315 contents = _drone_manager.get_pidfile_contents(
1316 self.pidfile_id, use_second_read=use_second_read)
1317 if contents.is_invalid():
1318 self._state = drone_manager.PidfileContents()
1319 raise self._PidfileException(contents)
1320 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001321
1322
showard21baa452008-10-21 00:08:39 +00001323 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001324 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1325 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001326 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001327 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001328
1329
1330 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001331 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001332 return
mblighbb421852008-03-11 22:36:16 +00001333
showard21baa452008-10-21 00:08:39 +00001334 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001335
showard170873e2009-01-07 00:22:26 +00001336 if self._state.process is None:
1337 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001338 return
mbligh90a549d2008-03-25 23:52:34 +00001339
showard21baa452008-10-21 00:08:39 +00001340 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001341 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001342 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001343 return
mbligh90a549d2008-03-25 23:52:34 +00001344
showard170873e2009-01-07 00:22:26 +00001345 # pid but no running process - maybe process *just* exited
1346 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001347 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001348 # autoserv exited without writing an exit code
1349 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001350 self._handle_pidfile_error(
1351 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001352
showard21baa452008-10-21 00:08:39 +00001353
1354 def _get_pidfile_info(self):
1355 """\
1356 After completion, self._state will contain:
1357 pid=None, exit_status=None if autoserv has not yet run
1358 pid!=None, exit_status=None if autoserv is running
1359 pid!=None, exit_status!=None if autoserv has completed
1360 """
1361 try:
1362 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001363 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001364 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001365
1366
showard170873e2009-01-07 00:22:26 +00001367 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001368 """\
1369 Called when no pidfile is found or no pid is in the pidfile.
1370 """
showard170873e2009-01-07 00:22:26 +00001371 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001372 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001373 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001374 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001375 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001376
1377
showard35162b02009-03-03 02:17:30 +00001378 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001379 """\
1380 Called when autoserv has exited without writing an exit status,
1381 or we've timed out waiting for autoserv to write a pid to the
1382 pidfile. In either case, we just return failure and the caller
1383 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001384
showard170873e2009-01-07 00:22:26 +00001385 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001386 """
1387 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001388 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001389 self._state.exit_status = 1
1390 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001391
1392
jadmanski0afbb632008-06-06 21:10:57 +00001393 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001394 self._get_pidfile_info()
1395 return self._state.exit_status
1396
1397
1398 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001399 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001400 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001401 if self._state.num_tests_failed is None:
1402 return -1
showard21baa452008-10-21 00:08:39 +00001403 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001404
1405
showardcdaeae82009-08-31 18:32:48 +00001406 def try_copy_results_on_drone(self, **kwargs):
1407 if self.has_process():
1408 # copy results logs into the normal place for job results
1409 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1410
1411
1412 def try_copy_to_results_repository(self, source, **kwargs):
1413 if self.has_process():
1414 _drone_manager.copy_to_results_repository(self.get_process(),
1415 source, **kwargs)
1416
1417
mbligh36768f02008-02-22 18:28:33 +00001418class Agent(object):
showard77182562009-06-10 00:16:05 +00001419 """
showard8cc058f2009-09-08 16:26:33 +00001420 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001421
1422 The following methods are required on all task objects:
1423 poll() - Called periodically to let the task check its status and
1424 update its internal state. If the task succeeded.
1425 is_done() - Returns True if the task is finished.
1426 abort() - Called when an abort has been requested. The task must
1427 set its aborted attribute to True if it actually aborted.
1428
1429 The following attributes are required on all task objects:
1430 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001431 success - bool, True if this task succeeded.
1432 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1433 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001434 """
1435
1436
showard418785b2009-11-23 20:19:59 +00001437 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001438 """
showard8cc058f2009-09-08 16:26:33 +00001439 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001440 """
showard8cc058f2009-09-08 16:26:33 +00001441 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001442
showard77182562009-06-10 00:16:05 +00001443 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001444 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001445
showard8cc058f2009-09-08 16:26:33 +00001446 self.queue_entry_ids = task.queue_entry_ids
1447 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001448
showard8cc058f2009-09-08 16:26:33 +00001449 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001450 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001451
1452
jadmanski0afbb632008-06-06 21:10:57 +00001453 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001454 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001455 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001456 self.task.poll()
1457 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001458 self.finished = True
showardec113162008-05-08 00:52:49 +00001459
1460
jadmanski0afbb632008-06-06 21:10:57 +00001461 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001462 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001463
1464
showardd3dc1992009-04-22 21:01:40 +00001465 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001466 if self.task:
1467 self.task.abort()
1468 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001469 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001470 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001471
showardd3dc1992009-04-22 21:01:40 +00001472
mbligh36768f02008-02-22 18:28:33 +00001473class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001474 class _NullMonitor(object):
1475 pidfile_id = None
1476
1477 def has_process(self):
1478 return True
1479
1480
1481 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001482 """
showardd1195652009-12-08 22:21:02 +00001483 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001484 """
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.done = False
showardd1195652009-12-08 22:21:02 +00001486 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001487 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001488 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001489 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001490 self.queue_entry_ids = []
1491 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001492 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001493
1494
1495 def _set_ids(self, host=None, queue_entries=None):
1496 if queue_entries and queue_entries != [None]:
1497 self.host_ids = [entry.host.id for entry in queue_entries]
1498 self.queue_entry_ids = [entry.id for entry in queue_entries]
1499 else:
1500 assert host
1501 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001502
1503
jadmanski0afbb632008-06-06 21:10:57 +00001504 def poll(self):
showard08a36412009-05-05 01:01:13 +00001505 if not self.started:
1506 self.start()
showardd1195652009-12-08 22:21:02 +00001507 if not self.done:
1508 self.tick()
showard08a36412009-05-05 01:01:13 +00001509
1510
1511 def tick(self):
showardd1195652009-12-08 22:21:02 +00001512 assert self.monitor
1513 exit_code = self.monitor.exit_code()
1514 if exit_code is None:
1515 return
mbligh36768f02008-02-22 18:28:33 +00001516
showardd1195652009-12-08 22:21:02 +00001517 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001518 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001519
1520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def is_done(self):
1522 return self.done
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001526 if self.done:
showardd1195652009-12-08 22:21:02 +00001527 assert self.started
showard08a36412009-05-05 01:01:13 +00001528 return
showardd1195652009-12-08 22:21:02 +00001529 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001530 self.done = True
1531 self.success = success
1532 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001533
1534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001536 """
1537 To be overridden.
1538 """
showarded2afea2009-07-07 20:54:07 +00001539 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001540 self.register_necessary_pidfiles()
1541
1542
1543 def _log_file(self):
1544 if not self._log_file_name:
1545 return None
1546 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001547
mbligh36768f02008-02-22 18:28:33 +00001548
jadmanski0afbb632008-06-06 21:10:57 +00001549 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001550 log_file = self._log_file()
1551 if self.monitor and log_file:
1552 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001553
1554
jadmanski0afbb632008-06-06 21:10:57 +00001555 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001556 """
1557 To be overridden.
1558 """
jadmanski0afbb632008-06-06 21:10:57 +00001559 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001560 logging.info("%s finished with success=%s", type(self).__name__,
1561 self.success)
1562
mbligh36768f02008-02-22 18:28:33 +00001563
1564
jadmanski0afbb632008-06-06 21:10:57 +00001565 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001566 if not self.started:
1567 self.prolog()
1568 self.run()
1569
1570 self.started = True
1571
1572
1573 def abort(self):
1574 if self.monitor:
1575 self.monitor.kill()
1576 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001577 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001578 self.cleanup()
1579
1580
showarded2afea2009-07-07 20:54:07 +00001581 def _get_consistent_execution_path(self, execution_entries):
1582 first_execution_path = execution_entries[0].execution_path()
1583 for execution_entry in execution_entries[1:]:
1584 assert execution_entry.execution_path() == first_execution_path, (
1585 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1586 execution_entry,
1587 first_execution_path,
1588 execution_entries[0]))
1589 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001590
1591
showarded2afea2009-07-07 20:54:07 +00001592 def _copy_results(self, execution_entries, use_monitor=None):
1593 """
1594 @param execution_entries: list of objects with execution_path() method
1595 """
showard6d1c1432009-08-20 23:30:39 +00001596 if use_monitor is not None and not use_monitor.has_process():
1597 return
1598
showarded2afea2009-07-07 20:54:07 +00001599 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001600 if use_monitor is None:
1601 assert self.monitor
1602 use_monitor = self.monitor
1603 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001604 execution_path = self._get_consistent_execution_path(execution_entries)
1605 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001606 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001607
showarda1e74b32009-05-12 17:32:04 +00001608
1609 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001610 for queue_entry in queue_entries:
1611 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001612
1613
mbligh4608b002010-01-05 18:22:35 +00001614 def _archive_results(self, queue_entries):
1615 for queue_entry in queue_entries:
1616 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001617
1618
showardd1195652009-12-08 22:21:02 +00001619 def _command_line(self):
1620 """
1621 Return the command line to run. Must be overridden.
1622 """
1623 raise NotImplementedError
1624
1625
1626 @property
1627 def num_processes(self):
1628 """
1629 Return the number of processes forked by this AgentTask's process. It
1630 may only be approximate. To be overridden if necessary.
1631 """
1632 return 1
1633
1634
1635 def _paired_with_monitor(self):
1636 """
1637 If this AgentTask's process must run on the same machine as some
1638 previous process, this method should be overridden to return a
1639 PidfileRunMonitor for that process.
1640 """
1641 return self._NullMonitor()
1642
1643
1644 @property
1645 def owner_username(self):
1646 """
1647 Return login of user responsible for this task. May be None. Must be
1648 overridden.
1649 """
1650 raise NotImplementedError
1651
1652
1653 def _working_directory(self):
1654 """
1655 Return the directory where this AgentTask's process executes. Must be
1656 overridden.
1657 """
1658 raise NotImplementedError
1659
1660
1661 def _pidfile_name(self):
1662 """
1663 Return the name of the pidfile this AgentTask's process uses. To be
1664 overridden if necessary.
1665 """
jamesrenc44ae992010-02-19 00:12:54 +00001666 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001667
1668
1669 def _check_paired_results_exist(self):
1670 if not self._paired_with_monitor().has_process():
1671 email_manager.manager.enqueue_notify_email(
1672 'No paired results in task',
1673 'No paired results in task %s at %s'
1674 % (self, self._paired_with_monitor().pidfile_id))
1675 self.finished(False)
1676 return False
1677 return True
1678
1679
1680 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001681 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001682 self.monitor = PidfileRunMonitor()
1683
1684
1685 def run(self):
1686 if not self._check_paired_results_exist():
1687 return
1688
1689 self._create_monitor()
1690 self.monitor.run(
1691 self._command_line(), self._working_directory(),
1692 num_processes=self.num_processes,
1693 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1694 pidfile_name=self._pidfile_name(),
1695 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001696 username=self.owner_username,
1697 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1698
1699
1700 def get_drone_hostnames_allowed(self):
1701 if not models.DroneSet.drone_sets_enabled():
1702 return None
1703
1704 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1705 if not hqes:
1706 # Only special tasks could be missing host queue entries
1707 assert isinstance(self, SpecialAgentTask)
1708 return self._user_or_global_default_drone_set(
1709 self.task, self.task.requested_by)
1710
1711 job_ids = hqes.values_list('job', flat=True).distinct()
1712 assert job_ids.count() == 1, ("AgentTask's queue entries "
1713 "span multiple jobs")
1714
1715 job = models.Job.objects.get(id=job_ids[0])
1716 drone_set = job.drone_set
1717 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001718 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001719
1720 return drone_set.get_drone_hostnames()
1721
1722
1723 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1724 """
1725 Returns the user's default drone set, if present.
1726
1727 Otherwise, returns the global default drone set.
1728 """
1729 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1730 if not user:
1731 logging.warn('%s had no owner; using default drone set',
1732 obj_with_owner)
1733 return default_hostnames
1734 if not user.drone_set:
1735 logging.warn('User %s has no default drone set, using global '
1736 'default', user.login)
1737 return default_hostnames
1738 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001739
1740
1741 def register_necessary_pidfiles(self):
1742 pidfile_id = _drone_manager.get_pidfile_id_from(
1743 self._working_directory(), self._pidfile_name())
1744 _drone_manager.register_pidfile(pidfile_id)
1745
1746 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1747 if paired_pidfile_id:
1748 _drone_manager.register_pidfile(paired_pidfile_id)
1749
1750
1751 def recover(self):
1752 if not self._check_paired_results_exist():
1753 return
1754
1755 self._create_monitor()
1756 self.monitor.attach_to_existing_process(
1757 self._working_directory(), pidfile_name=self._pidfile_name(),
1758 num_processes=self.num_processes)
1759 if not self.monitor.has_process():
1760 # no process to recover; wait to be started normally
1761 self.monitor = None
1762 return
1763
1764 self.started = True
1765 logging.info('Recovering process %s for %s at %s'
1766 % (self.monitor.get_process(), type(self).__name__,
1767 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001768
1769
mbligh4608b002010-01-05 18:22:35 +00001770 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1771 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001772 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001773 for entry in queue_entries:
1774 if entry.status not in allowed_hqe_statuses:
jamesrenb8f3f352010-06-10 00:44:06 +00001775 raise SchedulerError('%s attempting to start '
mbligh4608b002010-01-05 18:22:35 +00001776 'entry with invalid status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001777 % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001778 invalid_host_status = (
1779 allowed_host_statuses is not None
1780 and entry.host.status not in allowed_host_statuses)
1781 if invalid_host_status:
jamesrenb8f3f352010-06-10 00:44:06 +00001782 raise SchedulerError('%s attempting to start on queue '
mbligh4608b002010-01-05 18:22:35 +00001783 'entry with invalid host status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001784 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001785
1786
showardd9205182009-04-27 20:09:55 +00001787class TaskWithJobKeyvals(object):
1788 """AgentTask mixin providing functionality to help with job keyval files."""
1789 _KEYVAL_FILE = 'keyval'
1790 def _format_keyval(self, key, value):
1791 return '%s=%s' % (key, value)
1792
1793
1794 def _keyval_path(self):
1795 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001796 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001797
1798
1799 def _write_keyval_after_job(self, field, value):
1800 assert self.monitor
1801 if not self.monitor.has_process():
1802 return
1803 _drone_manager.write_lines_to_file(
1804 self._keyval_path(), [self._format_keyval(field, value)],
1805 paired_with_process=self.monitor.get_process())
1806
1807
1808 def _job_queued_keyval(self, job):
1809 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1810
1811
1812 def _write_job_finished(self):
1813 self._write_keyval_after_job("job_finished", int(time.time()))
1814
1815
showarddb502762009-09-09 15:31:20 +00001816 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1817 keyval_contents = '\n'.join(self._format_keyval(key, value)
1818 for key, value in keyval_dict.iteritems())
1819 # always end with a newline to allow additional keyvals to be written
1820 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001821 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001822 keyval_contents,
1823 file_path=keyval_path)
1824
1825
1826 def _write_keyvals_before_job(self, keyval_dict):
1827 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1828
1829
1830 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001831 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001832 host.hostname)
1833 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001834 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001835 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1836 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1837
1838
showard8cc058f2009-09-08 16:26:33 +00001839class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001840 """
1841 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1842 """
1843
1844 TASK_TYPE = None
1845 host = None
1846 queue_entry = None
1847
showardd1195652009-12-08 22:21:02 +00001848 def __init__(self, task, extra_command_args):
1849 super(SpecialAgentTask, self).__init__()
1850
lmrb7c5d272010-04-16 06:34:04 +00001851 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001852
jamesrenc44ae992010-02-19 00:12:54 +00001853 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001854 self.queue_entry = None
1855 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001856 self.queue_entry = scheduler_models.HostQueueEntry(
1857 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001858
showarded2afea2009-07-07 20:54:07 +00001859 self.task = task
1860 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001861
1862
showard8cc058f2009-09-08 16:26:33 +00001863 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001864 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1865
1866
1867 def _command_line(self):
1868 return _autoserv_command_line(self.host.hostname,
1869 self._extra_command_args,
1870 queue_entry=self.queue_entry)
1871
1872
1873 def _working_directory(self):
1874 return self.task.execution_path()
1875
1876
1877 @property
1878 def owner_username(self):
1879 if self.task.requested_by:
1880 return self.task.requested_by.login
1881 return None
showard8cc058f2009-09-08 16:26:33 +00001882
1883
showarded2afea2009-07-07 20:54:07 +00001884 def prolog(self):
1885 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001886 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001887 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001888
1889
showardde634ee2009-01-30 01:44:24 +00001890 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001891 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001892
showard2fe3f1d2009-07-06 20:19:11 +00001893 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001894 return # don't fail metahost entries, they'll be reassigned
1895
showard2fe3f1d2009-07-06 20:19:11 +00001896 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001897 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001898 return # entry has been aborted
1899
showard2fe3f1d2009-07-06 20:19:11 +00001900 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001901 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001902 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001903 self._write_keyval_after_job(queued_key, queued_time)
1904 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001905
showard8cc058f2009-09-08 16:26:33 +00001906 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001907 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001908 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001909 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001910
showard8cc058f2009-09-08 16:26:33 +00001911 pidfile_id = _drone_manager.get_pidfile_id_from(
1912 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001913 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001914 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001915
1916 if self.queue_entry.job.parse_failed_repair:
1917 self._parse_results([self.queue_entry])
1918 else:
1919 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001920
1921
1922 def cleanup(self):
1923 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001924
1925 # We will consider an aborted task to be "Failed"
1926 self.task.finish(bool(self.success))
1927
showardf85a0b72009-10-07 20:48:45 +00001928 if self.monitor:
1929 if self.monitor.has_process():
1930 self._copy_results([self.task])
1931 if self.monitor.pidfile_id is not None:
1932 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001933
1934
1935class RepairTask(SpecialAgentTask):
1936 TASK_TYPE = models.SpecialTask.Task.REPAIR
1937
1938
showardd1195652009-12-08 22:21:02 +00001939 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001940 """\
1941 queue_entry: queue entry to mark failed if this repair fails.
1942 """
1943 protection = host_protections.Protection.get_string(
1944 task.host.protection)
1945 # normalize the protection name
1946 protection = host_protections.Protection.get_attr_name(protection)
1947
1948 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001949 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001950
1951 # *don't* include the queue entry in IDs -- if the queue entry is
1952 # aborted, we want to leave the repair task running
1953 self._set_ids(host=self.host)
1954
1955
1956 def prolog(self):
1957 super(RepairTask, self).prolog()
1958 logging.info("repair_task starting")
1959 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001960
1961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def epilog(self):
1963 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001964
jadmanski0afbb632008-06-06 21:10:57 +00001965 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001966 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001967 else:
showard8cc058f2009-09-08 16:26:33 +00001968 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001969 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001970 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001971
1972
showarded2afea2009-07-07 20:54:07 +00001973class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001974 def _copy_to_results_repository(self):
1975 if not self.queue_entry or self.queue_entry.meta_host:
1976 return
1977
1978 self.queue_entry.set_execution_subdir()
1979 log_name = os.path.basename(self.task.execution_path())
1980 source = os.path.join(self.task.execution_path(), 'debug',
1981 'autoserv.DEBUG')
1982 destination = os.path.join(
1983 self.queue_entry.execution_path(), log_name)
1984
1985 self.monitor.try_copy_to_results_repository(
1986 source, destination_path=destination)
1987
1988
showard170873e2009-01-07 00:22:26 +00001989 def epilog(self):
1990 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001991
showard775300b2009-09-09 15:30:50 +00001992 if self.success:
1993 return
showard8fe93b52008-11-18 17:53:22 +00001994
showard775300b2009-09-09 15:30:50 +00001995 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001996
showard775300b2009-09-09 15:30:50 +00001997 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001998 # effectively ignore failure for these hosts
1999 self.success = True
showard775300b2009-09-09 15:30:50 +00002000 return
2001
2002 if self.queue_entry:
2003 self.queue_entry.requeue()
2004
2005 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00002006 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00002007 queue_entry__id=self.queue_entry.id):
2008 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2009 self._fail_queue_entry()
2010 return
2011
showard9bb960b2009-11-19 01:02:11 +00002012 queue_entry = models.HostQueueEntry.objects.get(
2013 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002014 else:
2015 queue_entry = None
2016
2017 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002018 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002019 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002020 queue_entry=queue_entry,
2021 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002022
showard8fe93b52008-11-18 17:53:22 +00002023
2024class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002025 TASK_TYPE = models.SpecialTask.Task.VERIFY
2026
2027
showardd1195652009-12-08 22:21:02 +00002028 def __init__(self, task):
2029 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002030 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002031
2032
jadmanski0afbb632008-06-06 21:10:57 +00002033 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002034 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002035
showardb18134f2009-03-20 20:52:18 +00002036 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002037 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002038 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2039 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002040
jamesren42318f72010-05-10 23:40:59 +00002041 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002042 # and there's no need to keep records of other requests.
2043 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002044 host__id=self.host.id,
2045 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00002046 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00002047 queued_verifies = queued_verifies.exclude(id=self.task.id)
2048 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002049
mbligh36768f02008-02-22 18:28:33 +00002050
jadmanski0afbb632008-06-06 21:10:57 +00002051 def epilog(self):
2052 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002053 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002054 if self.queue_entry:
2055 self.queue_entry.on_pending()
2056 else:
2057 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002058
2059
mbligh4608b002010-01-05 18:22:35 +00002060class CleanupTask(PreJobTask):
2061 # note this can also run post-job, but when it does, it's running standalone
2062 # against the host (not related to the job), so it's not considered a
2063 # PostJobTask
2064
2065 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2066
2067
2068 def __init__(self, task, recover_run_monitor=None):
2069 super(CleanupTask, self).__init__(task, ['--cleanup'])
2070 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2071
2072
2073 def prolog(self):
2074 super(CleanupTask, self).prolog()
2075 logging.info("starting cleanup task for host: %s", self.host.hostname)
2076 self.host.set_status(models.Host.Status.CLEANING)
2077 if self.queue_entry:
2078 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2079
2080
2081 def _finish_epilog(self):
2082 if not self.queue_entry or not self.success:
2083 return
2084
2085 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2086 should_run_verify = (
2087 self.queue_entry.job.run_verify
2088 and self.host.protection != do_not_verify_protection)
2089 if should_run_verify:
2090 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2091 models.SpecialTask.objects.create(
2092 host=models.Host.objects.get(id=self.host.id),
2093 queue_entry=entry,
2094 task=models.SpecialTask.Task.VERIFY)
2095 else:
2096 self.queue_entry.on_pending()
2097
2098
2099 def epilog(self):
2100 super(CleanupTask, self).epilog()
2101
2102 if self.success:
2103 self.host.update_field('dirty', 0)
2104 self.host.set_status(models.Host.Status.READY)
2105
2106 self._finish_epilog()
2107
2108
showarda9545c02009-12-18 22:44:26 +00002109class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2110 """
2111 Common functionality for QueueTask and HostlessQueueTask
2112 """
2113 def __init__(self, queue_entries):
2114 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002115 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002116 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002117
2118
showard73ec0442009-02-07 02:05:20 +00002119 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002120 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002121
2122
jamesrenc44ae992010-02-19 00:12:54 +00002123 def _write_control_file(self, execution_path):
2124 control_path = _drone_manager.attach_file_to_execution(
2125 execution_path, self.job.control_file)
2126 return control_path
2127
2128
showardd1195652009-12-08 22:21:02 +00002129 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002130 execution_path = self.queue_entries[0].execution_path()
2131 control_path = self._write_control_file(execution_path)
2132 hostnames = ','.join(entry.host.hostname
2133 for entry in self.queue_entries
2134 if not entry.is_hostless())
2135
2136 execution_tag = self.queue_entries[0].execution_tag()
2137 params = _autoserv_command_line(
2138 hostnames,
2139 ['-P', execution_tag, '-n',
2140 _drone_manager.absolute_path(control_path)],
2141 job=self.job, verbose=False)
2142
2143 if not self.job.is_server_job():
2144 params.append('-c')
2145
2146 return params
showardd1195652009-12-08 22:21:02 +00002147
2148
2149 @property
2150 def num_processes(self):
2151 return len(self.queue_entries)
2152
2153
2154 @property
2155 def owner_username(self):
2156 return self.job.owner
2157
2158
2159 def _working_directory(self):
2160 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002161
2162
jadmanski0afbb632008-06-06 21:10:57 +00002163 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002164 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002165 keyval_dict = self.job.keyval_dict()
2166 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002167 group_name = self.queue_entries[0].get_group_name()
2168 if group_name:
2169 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002170 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002171 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002172 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002173 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002174
2175
showard35162b02009-03-03 02:17:30 +00002176 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002177 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002178 _drone_manager.write_lines_to_file(error_file_path,
2179 [_LOST_PROCESS_ERROR])
2180
2181
showardd3dc1992009-04-22 21:01:40 +00002182 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002183 if not self.monitor:
2184 return
2185
showardd9205182009-04-27 20:09:55 +00002186 self._write_job_finished()
2187
showard35162b02009-03-03 02:17:30 +00002188 if self.monitor.lost_process:
2189 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002190
jadmanskif7fa2cc2008-10-01 14:13:23 +00002191
showardcbd74612008-11-19 21:42:02 +00002192 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002193 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002194 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002195 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002196 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002197
2198
jadmanskif7fa2cc2008-10-01 14:13:23 +00002199 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002200 if not self.monitor or not self.monitor.has_process():
2201 return
2202
jadmanskif7fa2cc2008-10-01 14:13:23 +00002203 # build up sets of all the aborted_by and aborted_on values
2204 aborted_by, aborted_on = set(), set()
2205 for queue_entry in self.queue_entries:
2206 if queue_entry.aborted_by:
2207 aborted_by.add(queue_entry.aborted_by)
2208 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2209 aborted_on.add(t)
2210
2211 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002212 # TODO(showard): this conditional is now obsolete, we just need to leave
2213 # it in temporarily for backwards compatibility over upgrades. delete
2214 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002215 assert len(aborted_by) <= 1
2216 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002217 aborted_by_value = aborted_by.pop()
2218 aborted_on_value = max(aborted_on)
2219 else:
2220 aborted_by_value = 'autotest_system'
2221 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002222
showarda0382352009-02-11 23:36:43 +00002223 self._write_keyval_after_job("aborted_by", aborted_by_value)
2224 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002225
showardcbd74612008-11-19 21:42:02 +00002226 aborted_on_string = str(datetime.datetime.fromtimestamp(
2227 aborted_on_value))
2228 self._write_status_comment('Job aborted by %s on %s' %
2229 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002230
2231
jadmanski0afbb632008-06-06 21:10:57 +00002232 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002233 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002234 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002235 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002236
2237
jadmanski0afbb632008-06-06 21:10:57 +00002238 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002239 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002240 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002241
2242
2243class QueueTask(AbstractQueueTask):
2244 def __init__(self, queue_entries):
2245 super(QueueTask, self).__init__(queue_entries)
2246 self._set_ids(queue_entries=queue_entries)
2247
2248
2249 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002250 self._check_queue_entry_statuses(
2251 self.queue_entries,
2252 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2253 models.HostQueueEntry.Status.RUNNING),
2254 allowed_host_statuses=(models.Host.Status.PENDING,
2255 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002256
2257 super(QueueTask, self).prolog()
2258
2259 for queue_entry in self.queue_entries:
2260 self._write_host_keyvals(queue_entry.host)
2261 queue_entry.host.set_status(models.Host.Status.RUNNING)
2262 queue_entry.host.update_field('dirty', 1)
2263 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2264 # TODO(gps): Remove this if nothing needs it anymore.
2265 # A potential user is: tko/parser
2266 self.job.write_to_machines_file(self.queue_entries[0])
2267
2268
2269 def _finish_task(self):
2270 super(QueueTask, self)._finish_task()
2271
2272 for queue_entry in self.queue_entries:
2273 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002274 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002275
2276
mbligh4608b002010-01-05 18:22:35 +00002277class HostlessQueueTask(AbstractQueueTask):
2278 def __init__(self, queue_entry):
2279 super(HostlessQueueTask, self).__init__([queue_entry])
2280 self.queue_entry_ids = [queue_entry.id]
2281
2282
2283 def prolog(self):
2284 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2285 super(HostlessQueueTask, self).prolog()
2286
2287
mbligh4608b002010-01-05 18:22:35 +00002288 def _finish_task(self):
2289 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002290 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002291
2292
showardd3dc1992009-04-22 21:01:40 +00002293class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002294 def __init__(self, queue_entries, log_file_name):
2295 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002296
showardd1195652009-12-08 22:21:02 +00002297 self.queue_entries = queue_entries
2298
showardd3dc1992009-04-22 21:01:40 +00002299 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002300 self._autoserv_monitor.attach_to_existing_process(
2301 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002302
showardd1195652009-12-08 22:21:02 +00002303
2304 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002305 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002306 return 'true'
2307 return self._generate_command(
2308 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002309
2310
2311 def _generate_command(self, results_dir):
2312 raise NotImplementedError('Subclasses must override this')
2313
2314
showardd1195652009-12-08 22:21:02 +00002315 @property
2316 def owner_username(self):
2317 return self.queue_entries[0].job.owner
2318
2319
2320 def _working_directory(self):
2321 return self._get_consistent_execution_path(self.queue_entries)
2322
2323
2324 def _paired_with_monitor(self):
2325 return self._autoserv_monitor
2326
2327
showardd3dc1992009-04-22 21:01:40 +00002328 def _job_was_aborted(self):
2329 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002330 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002331 queue_entry.update_from_database()
2332 if was_aborted is None: # first queue entry
2333 was_aborted = bool(queue_entry.aborted)
2334 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002335 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2336 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002337 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002338 'Inconsistent abort state',
2339 'Queue entries have inconsistent abort state:\n' +
2340 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002341 # don't crash here, just assume true
2342 return True
2343 return was_aborted
2344
2345
showardd1195652009-12-08 22:21:02 +00002346 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002347 if self._job_was_aborted():
2348 return models.HostQueueEntry.Status.ABORTED
2349
2350 # we'll use a PidfileRunMonitor to read the autoserv exit status
2351 if self._autoserv_monitor.exit_code() == 0:
2352 return models.HostQueueEntry.Status.COMPLETED
2353 return models.HostQueueEntry.Status.FAILED
2354
2355
showardd3dc1992009-04-22 21:01:40 +00002356 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002357 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002358 queue_entry.set_status(status)
2359
2360
2361 def abort(self):
2362 # override AgentTask.abort() to avoid killing the process and ending
2363 # the task. post-job tasks continue when the job is aborted.
2364 pass
2365
2366
mbligh4608b002010-01-05 18:22:35 +00002367 def _pidfile_label(self):
2368 # '.autoserv_execute' -> 'autoserv'
2369 return self._pidfile_name()[1:-len('_execute')]
2370
2371
showard9bb960b2009-11-19 01:02:11 +00002372class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002373 """
2374 Task responsible for
2375 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2376 * copying logs to the results repository
2377 * spawning CleanupTasks for hosts, if necessary
2378 * spawning a FinalReparseTask for the job
2379 """
showardd1195652009-12-08 22:21:02 +00002380 def __init__(self, queue_entries, recover_run_monitor=None):
2381 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002382 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002383 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002384 self._set_ids(queue_entries=queue_entries)
2385
2386
2387 def _generate_command(self, results_dir):
2388 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002389 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002390 return [_autoserv_path , '-p',
2391 '--pidfile-label=%s' % self._pidfile_label(),
2392 '--use-existing-results', '--collect-crashinfo',
2393 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002394
2395
showardd1195652009-12-08 22:21:02 +00002396 @property
2397 def num_processes(self):
2398 return len(self.queue_entries)
2399
2400
2401 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002402 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002403
2404
showardd3dc1992009-04-22 21:01:40 +00002405 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002406 self._check_queue_entry_statuses(
2407 self.queue_entries,
2408 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2409 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002410
showardd3dc1992009-04-22 21:01:40 +00002411 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002412
2413
showardd3dc1992009-04-22 21:01:40 +00002414 def epilog(self):
2415 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002416 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002417 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002418
showard9bb960b2009-11-19 01:02:11 +00002419
2420 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002421 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002422 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002423 models.HostQueueEntry.Status.COMPLETED)
2424 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2425 else:
2426 final_success = False
2427 num_tests_failed = 0
2428
showard9bb960b2009-11-19 01:02:11 +00002429 reboot_after = self._job.reboot_after
2430 do_reboot = (
2431 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002432 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002433 or reboot_after == model_attributes.RebootAfter.ALWAYS
2434 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002435 and final_success and num_tests_failed == 0))
2436
showardd1195652009-12-08 22:21:02 +00002437 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002438 if do_reboot:
2439 # don't pass the queue entry to the CleanupTask. if the cleanup
2440 # fails, the job doesn't care -- it's over.
2441 models.SpecialTask.objects.create(
2442 host=models.Host.objects.get(id=queue_entry.host.id),
2443 task=models.SpecialTask.Task.CLEANUP,
2444 requested_by=self._job.owner_model())
2445 else:
2446 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002447
2448
showard0bbfc212009-04-29 21:06:13 +00002449 def run(self):
showard597bfd32009-05-08 18:22:50 +00002450 autoserv_exit_code = self._autoserv_monitor.exit_code()
2451 # only run if Autoserv exited due to some signal. if we have no exit
2452 # code, assume something bad (and signal-like) happened.
2453 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002454 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002455 else:
2456 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002457
2458
mbligh4608b002010-01-05 18:22:35 +00002459class SelfThrottledPostJobTask(PostJobTask):
2460 """
2461 Special AgentTask subclass that maintains its own global process limit.
2462 """
2463 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002464
2465
mbligh4608b002010-01-05 18:22:35 +00002466 @classmethod
2467 def _increment_running_processes(cls):
2468 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002469
mblighd5c95802008-03-05 00:33:46 +00002470
mbligh4608b002010-01-05 18:22:35 +00002471 @classmethod
2472 def _decrement_running_processes(cls):
2473 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002474
2475
mbligh4608b002010-01-05 18:22:35 +00002476 @classmethod
2477 def _max_processes(cls):
2478 raise NotImplementedError
2479
2480
2481 @classmethod
2482 def _can_run_new_process(cls):
2483 return cls._num_running_processes < cls._max_processes()
2484
2485
2486 def _process_started(self):
2487 return bool(self.monitor)
2488
2489
2490 def tick(self):
2491 # override tick to keep trying to start until the process count goes
2492 # down and we can, at which point we revert to default behavior
2493 if self._process_started():
2494 super(SelfThrottledPostJobTask, self).tick()
2495 else:
2496 self._try_starting_process()
2497
2498
2499 def run(self):
2500 # override run() to not actually run unless we can
2501 self._try_starting_process()
2502
2503
2504 def _try_starting_process(self):
2505 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002506 return
2507
mbligh4608b002010-01-05 18:22:35 +00002508 # actually run the command
2509 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002510 if self._process_started():
2511 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002512
mblighd5c95802008-03-05 00:33:46 +00002513
mbligh4608b002010-01-05 18:22:35 +00002514 def finished(self, success):
2515 super(SelfThrottledPostJobTask, self).finished(success)
2516 if self._process_started():
2517 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002518
showard21baa452008-10-21 00:08:39 +00002519
mbligh4608b002010-01-05 18:22:35 +00002520class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002521 def __init__(self, queue_entries):
2522 super(FinalReparseTask, self).__init__(queue_entries,
2523 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002524 # don't use _set_ids, since we don't want to set the host_ids
2525 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002526
2527
2528 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002529 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002530 results_dir]
2531
2532
2533 @property
2534 def num_processes(self):
2535 return 0 # don't include parser processes in accounting
2536
2537
2538 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002539 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002540
2541
showard97aed502008-11-04 02:01:24 +00002542 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002543 def _max_processes(cls):
2544 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002545
2546
2547 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002548 self._check_queue_entry_statuses(
2549 self.queue_entries,
2550 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002551
showard97aed502008-11-04 02:01:24 +00002552 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002553
2554
2555 def epilog(self):
2556 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002557 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002558
2559
mbligh4608b002010-01-05 18:22:35 +00002560class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002561 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2562
mbligh4608b002010-01-05 18:22:35 +00002563 def __init__(self, queue_entries):
2564 super(ArchiveResultsTask, self).__init__(queue_entries,
2565 log_file_name='.archiving.log')
2566 # don't use _set_ids, since we don't want to set the host_ids
2567 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002568
2569
mbligh4608b002010-01-05 18:22:35 +00002570 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002571 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002572
2573
mbligh4608b002010-01-05 18:22:35 +00002574 def _generate_command(self, results_dir):
2575 return [_autoserv_path , '-p',
2576 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002577 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002578 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2579 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002580
2581
mbligh4608b002010-01-05 18:22:35 +00002582 @classmethod
2583 def _max_processes(cls):
2584 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002585
2586
2587 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002588 self._check_queue_entry_statuses(
2589 self.queue_entries,
2590 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2591
2592 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002593
2594
mbligh4608b002010-01-05 18:22:35 +00002595 def epilog(self):
2596 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002597 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002598 failed_file = os.path.join(self._working_directory(),
2599 self._ARCHIVING_FAILED_FILE)
2600 paired_process = self._paired_with_monitor().get_process()
2601 _drone_manager.write_lines_to_file(
2602 failed_file, ['Archiving failed with exit code %s'
2603 % self.monitor.exit_code()],
2604 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002605 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002606
2607
mbligh36768f02008-02-22 18:28:33 +00002608if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002609 main()