blob: 556eb1747b04173f167bfda8287b7f6af60541f1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000056_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000057_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000058
Eric Lie0493a42010-11-15 13:05:43 -080059def _parser_path_default(install_dir):
60 return os.path.join(install_dir, 'tko', 'parse')
61_parser_path_func = utils.import_site_function(
62 __file__, 'autotest_lib.scheduler.site_monitor_db',
63 'parser_path', _parser_path_default)
64_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
65
mbligh36768f02008-02-22 18:28:33 +000066
showardec6a3b92009-09-25 20:29:13 +000067def _get_pidfile_timeout_secs():
68 """@returns How long to wait for autoserv to write pidfile."""
69 pidfile_timeout_mins = global_config.global_config.get_config_value(
70 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
71 return pidfile_timeout_mins * 60
72
73
mbligh83c1e9e2009-05-01 23:10:41 +000074def _site_init_monitor_db_dummy():
75 return {}
76
77
jamesrenc44ae992010-02-19 00:12:54 +000078get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000079 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000080 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000081
82
jamesren76fcf192010-04-21 20:39:50 +000083def _verify_default_drone_set_exists():
84 if (models.DroneSet.drone_sets_enabled() and
85 not models.DroneSet.default_drone_set_name()):
86 raise SchedulerError('Drone sets are enabled, but no default is set')
87
88
89def _sanity_check():
90 """Make sure the configs are consistent before starting the scheduler"""
91 _verify_default_drone_set_exists()
92
93
mbligh36768f02008-02-22 18:28:33 +000094def main():
showard27f33872009-04-07 18:20:53 +000095 try:
showard549afad2009-08-20 23:33:36 +000096 try:
97 main_without_exception_handling()
98 except SystemExit:
99 raise
100 except:
101 logging.exception('Exception escaping in monitor_db')
102 raise
103 finally:
104 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000105
106
107def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000108 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000109
showard136e6dc2009-06-10 19:38:49 +0000110 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000111 parser = optparse.OptionParser(usage)
112 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
113 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000114 parser.add_option('--test', help='Indicate that scheduler is under ' +
115 'test and should use dummy autoserv and no parsing',
116 action='store_true')
117 (options, args) = parser.parse_args()
118 if len(args) != 1:
119 parser.print_usage()
120 return
mbligh36768f02008-02-22 18:28:33 +0000121
showard5613c662009-06-08 23:30:33 +0000122 scheduler_enabled = global_config.global_config.get_config_value(
123 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
124
125 if not scheduler_enabled:
126 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
127 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000128 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000129 sys.exit(1)
130
jadmanski0afbb632008-06-06 21:10:57 +0000131 global RESULTS_DIR
132 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000133
mbligh83c1e9e2009-05-01 23:10:41 +0000134 site_init = utils.import_site_function(__file__,
135 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
136 _site_init_monitor_db_dummy)
137 site_init()
138
showardcca334f2009-03-12 20:38:34 +0000139 # Change the cwd while running to avoid issues incase we were launched from
140 # somewhere odd (such as a random NFS home directory of the person running
141 # sudo to launch us as the appropriate user).
142 os.chdir(RESULTS_DIR)
143
jamesrenc7d387e2010-08-10 21:48:30 +0000144 # This is helpful for debugging why stuff a scheduler launches is
145 # misbehaving.
146 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000147
jadmanski0afbb632008-06-06 21:10:57 +0000148 if options.test:
149 global _autoserv_path
150 _autoserv_path = 'autoserv_dummy'
151 global _testing_mode
152 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000153
jamesrenc44ae992010-02-19 00:12:54 +0000154 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000155 server.start()
156
jadmanski0afbb632008-06-06 21:10:57 +0000157 try:
jamesrenc44ae992010-02-19 00:12:54 +0000158 initialize()
showardc5afc462009-01-13 00:09:39 +0000159 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000160 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000161
Eric Lia82dc352011-02-23 13:15:52 -0800162 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000163 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000164 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000165 except:
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.log_stacktrace(
167 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000168
showard170873e2009-01-07 00:22:26 +0000169 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000170 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000171 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000172 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000173
174
showard136e6dc2009-06-10 19:38:49 +0000175def setup_logging():
176 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
177 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
178 logging_manager.configure_logging(
179 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
180 logfile_name=log_name)
181
182
mbligh36768f02008-02-22 18:28:33 +0000183def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000184 global _shutdown
185 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000186 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000187
188
jamesrenc44ae992010-02-19 00:12:54 +0000189def initialize():
showardb18134f2009-03-20 20:52:18 +0000190 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
191 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000192
showard8de37132009-08-31 18:33:08 +0000193 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000194 logging.critical("monitor_db already running, aborting!")
195 sys.exit(1)
196 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000197
showardb1e51872008-10-07 11:08:18 +0000198 if _testing_mode:
199 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000200 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000201
jadmanski0afbb632008-06-06 21:10:57 +0000202 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
203 global _db
showard170873e2009-01-07 00:22:26 +0000204 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000205 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000206
showardfa8629c2008-11-04 16:51:23 +0000207 # ensure Django connection is in autocommit
208 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000209 # bypass the readonly connection
210 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000211
showardb18134f2009-03-20 20:52:18 +0000212 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000213 signal.signal(signal.SIGINT, handle_sigint)
214
jamesrenc44ae992010-02-19 00:12:54 +0000215 initialize_globals()
216 scheduler_models.initialize()
217
showardd1ee1dd2009-01-07 21:33:08 +0000218 drones = global_config.global_config.get_config_value(
219 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
220 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000221 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000222 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000223 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
224
showardb18134f2009-03-20 20:52:18 +0000225 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000226
227
jamesrenc44ae992010-02-19 00:12:54 +0000228def initialize_globals():
229 global _drone_manager
230 _drone_manager = drone_manager.instance()
231
232
showarded2afea2009-07-07 20:54:07 +0000233def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
234 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000235 """
236 @returns The autoserv command line as a list of executable + parameters.
237
238 @param machines - string - A machine or comma separated list of machines
239 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000240 @param extra_args - list - Additional arguments to pass to autoserv.
241 @param job - Job object - If supplied, -u owner and -l name parameters
242 will be added.
243 @param queue_entry - A HostQueueEntry object - If supplied and no Job
244 object was supplied, this will be used to lookup the Job object.
245 """
showarda9545c02009-12-18 22:44:26 +0000246 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000247 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000248 if machines:
249 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000250 if job or queue_entry:
251 if not job:
252 job = queue_entry.job
253 autoserv_argv += ['-u', job.owner, '-l', job.name]
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800254 if job.is_image_update_job():
255 autoserv_argv += ['--image', job.update_image_path]
showarde9c69362009-06-30 01:58:03 +0000256 if verbose:
257 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000258 return autoserv_argv + extra_args
259
260
showard89f84db2009-03-12 20:39:13 +0000261class SchedulerError(Exception):
262 """Raised by HostScheduler when an inconsistent state occurs."""
263
264
Eric Li861b2d52011-02-04 14:50:35 -0800265class BaseHostScheduler(metahost_scheduler.HostSchedulingUtility):
jamesren883492a2010-02-12 00:45:18 +0000266 """Handles the logic for choosing when to run jobs and on which hosts.
267
268 This class makes several queries to the database on each tick, building up
269 some auxiliary data structures and using them to determine which hosts are
270 eligible to run which jobs, taking into account all the various factors that
271 affect that.
272
273 In the past this was done with one or two very large, complex database
274 queries. It has proven much simpler and faster to build these auxiliary
275 data structures and perform the logic in Python.
276 """
277 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000278 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
279
280 # load site-specific scheduler selected in global_config
281 site_schedulers_str = global_config.global_config.get_config_value(
282 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
283 default='')
284 site_schedulers = set(site_schedulers_str.split(','))
285 for scheduler in get_site_metahost_schedulers():
286 if type(scheduler).__name__ in site_schedulers:
287 # always prepend, so site schedulers take precedence
288 self._metahost_schedulers = (
289 [scheduler] + self._metahost_schedulers)
290 logging.info('Metahost schedulers: %s',
291 ', '.join(type(scheduler).__name__ for scheduler
292 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000293
294
showard63a34772008-08-18 19:32:50 +0000295 def _get_ready_hosts(self):
296 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000297 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000298 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
299 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000300 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000301 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000302 "AND NOT afe_hosts.locked "
303 "AND (afe_hosts.status IS NULL "
304 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000305 return dict((host.id, host) for host in hosts)
306
307
308 @staticmethod
309 def _get_sql_id_list(id_list):
310 return ','.join(str(item_id) for item_id in id_list)
311
312
313 @classmethod
showard989f25d2008-10-01 11:38:11 +0000314 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000315 if not id_list:
316 return {}
showard63a34772008-08-18 19:32:50 +0000317 query %= cls._get_sql_id_list(id_list)
318 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000319 return cls._process_many2many_dict(rows, flip)
320
321
322 @staticmethod
323 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000324 result = {}
325 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000326 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000327 if flip:
328 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000329 result.setdefault(left_id, set()).add(right_id)
330 return result
331
332
333 @classmethod
334 def _get_job_acl_groups(cls, job_ids):
335 query = """
showardeab66ce2009-12-23 00:03:56 +0000336 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
337 FROM afe_jobs
338 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
339 INNER JOIN afe_acl_groups_users ON
340 afe_acl_groups_users.user_id = afe_users.id
341 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000342 """
343 return cls._get_many2many_dict(query, job_ids)
344
345
346 @classmethod
347 def _get_job_ineligible_hosts(cls, job_ids):
348 query = """
349 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000350 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000351 WHERE job_id IN (%s)
352 """
353 return cls._get_many2many_dict(query, job_ids)
354
355
356 @classmethod
showard989f25d2008-10-01 11:38:11 +0000357 def _get_job_dependencies(cls, job_ids):
358 query = """
359 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000360 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000361 WHERE job_id IN (%s)
362 """
363 return cls._get_many2many_dict(query, job_ids)
364
365
366 @classmethod
showard63a34772008-08-18 19:32:50 +0000367 def _get_host_acls(cls, host_ids):
368 query = """
showardd9ac4452009-02-07 02:04:37 +0000369 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000370 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000371 WHERE host_id IN (%s)
372 """
373 return cls._get_many2many_dict(query, host_ids)
374
375
376 @classmethod
377 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000378 if not host_ids:
379 return {}, {}
showard63a34772008-08-18 19:32:50 +0000380 query = """
381 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000382 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000383 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000384 """ % cls._get_sql_id_list(host_ids)
385 rows = _db.execute(query)
386 labels_to_hosts = cls._process_many2many_dict(rows)
387 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
388 return labels_to_hosts, hosts_to_labels
389
390
391 @classmethod
392 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000393 return dict((label.id, label) for label
394 in scheduler_models.Label.fetch())
395
396
397 def recovery_on_startup(self):
398 for metahost_scheduler in self._metahost_schedulers:
399 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000400
401
402 def refresh(self, pending_queue_entries):
403 self._hosts_available = self._get_ready_hosts()
404
405 relevant_jobs = [queue_entry.job_id
406 for queue_entry in pending_queue_entries]
407 self._job_acls = self._get_job_acl_groups(relevant_jobs)
408 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000409 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000410
411 host_ids = self._hosts_available.keys()
412 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000413 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
414
415 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000416
jamesrene21bf412010-02-26 02:30:07 +0000417
418 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000419 for metahost_scheduler in self._metahost_schedulers:
420 metahost_scheduler.tick()
421
showard63a34772008-08-18 19:32:50 +0000422
jamesren883492a2010-02-12 00:45:18 +0000423 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000424 return set(self._label_hosts.get(label_id, ()))
425
426
427 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000428 self._label_hosts[label_id].remove(host_id)
429
430
431 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000432 return self._hosts_available.pop(host_id)
433
434
435 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000436 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
437
438
showard63a34772008-08-18 19:32:50 +0000439 def _is_acl_accessible(self, host_id, queue_entry):
440 job_acls = self._job_acls.get(queue_entry.job_id, set())
441 host_acls = self._host_acls.get(host_id, set())
442 return len(host_acls.intersection(job_acls)) > 0
443
444
showard989f25d2008-10-01 11:38:11 +0000445 def _check_job_dependencies(self, job_dependencies, host_labels):
446 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000447 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000448
449
450 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
451 queue_entry):
showardade14e22009-01-26 22:38:32 +0000452 if not queue_entry.meta_host:
453 # bypass only_if_needed labels when a specific host is selected
454 return True
455
showard989f25d2008-10-01 11:38:11 +0000456 for label_id in host_labels:
457 label = self._labels[label_id]
458 if not label.only_if_needed:
459 # we don't care about non-only_if_needed labels
460 continue
461 if queue_entry.meta_host == label_id:
462 # if the label was requested in a metahost it's OK
463 continue
464 if label_id not in job_dependencies:
465 return False
466 return True
467
468
showard89f84db2009-03-12 20:39:13 +0000469 def _check_atomic_group_labels(self, host_labels, queue_entry):
470 """
471 Determine if the given HostQueueEntry's atomic group settings are okay
472 to schedule on a host with the given labels.
473
showard6157c632009-07-06 20:19:31 +0000474 @param host_labels: A list of label ids that the host has.
475 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000476
477 @returns True if atomic group settings are okay, False otherwise.
478 """
showard6157c632009-07-06 20:19:31 +0000479 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000480 queue_entry.atomic_group_id)
481
482
showard6157c632009-07-06 20:19:31 +0000483 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000484 """
485 Return the atomic group label id for a host with the given set of
486 labels if any, or None otherwise. Raises an exception if more than
487 one atomic group are found in the set of labels.
488
showard6157c632009-07-06 20:19:31 +0000489 @param host_labels: A list of label ids that the host has.
490 @param queue_entry: The HostQueueEntry we're testing. Only used for
491 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000492
493 @returns The id of the atomic group found on a label in host_labels
494 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000495 """
showard6157c632009-07-06 20:19:31 +0000496 atomic_labels = [self._labels[label_id] for label_id in host_labels
497 if self._labels[label_id].atomic_group_id is not None]
498 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000499 if not atomic_ids:
500 return None
501 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000502 logging.error('More than one Atomic Group on HQE "%s" via: %r',
503 queue_entry, atomic_labels)
504 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000505
506
507 def _get_atomic_group_labels(self, atomic_group_id):
508 """
509 Lookup the label ids that an atomic_group is associated with.
510
511 @param atomic_group_id - The id of the AtomicGroup to look up.
512
513 @returns A generator yeilding Label ids for this atomic group.
514 """
515 return (id for id, label in self._labels.iteritems()
516 if label.atomic_group_id == atomic_group_id
517 and not label.invalid)
518
519
showard54c1ea92009-05-20 00:32:58 +0000520 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000521 """
522 @param group_hosts - A sequence of Host ids to test for usability
523 and eligibility against the Job associated with queue_entry.
524 @param queue_entry - The HostQueueEntry that these hosts are being
525 tested for eligibility against.
526
527 @returns A subset of group_hosts Host ids that are eligible for the
528 supplied queue_entry.
529 """
530 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000531 if self.is_host_usable(host_id)
532 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000533
534
jamesren883492a2010-02-12 00:45:18 +0000535 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000536 if self._is_host_invalid(host_id):
537 # if an invalid host is scheduled for a job, it's a one-time host
538 # and it therefore bypasses eligibility checks. note this can only
539 # happen for non-metahosts, because invalid hosts have their label
540 # relationships cleared.
541 return True
542
showard989f25d2008-10-01 11:38:11 +0000543 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
544 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000545
showard89f84db2009-03-12 20:39:13 +0000546 return (self._is_acl_accessible(host_id, queue_entry) and
547 self._check_job_dependencies(job_dependencies, host_labels) and
548 self._check_only_if_needed_labels(
549 job_dependencies, host_labels, queue_entry) and
550 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000551
552
showard2924b0a2009-06-18 23:16:15 +0000553 def _is_host_invalid(self, host_id):
554 host_object = self._hosts_available.get(host_id, None)
555 return host_object and host_object.invalid
556
557
showard63a34772008-08-18 19:32:50 +0000558 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000559 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000560 return None
561 return self._hosts_available.pop(queue_entry.host_id, None)
562
563
jamesren883492a2010-02-12 00:45:18 +0000564 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000565 if host_id not in self._hosts_available:
566 # host was already used during this scheduling cycle
567 return False
568 if self._hosts_available[host_id].invalid:
569 # Invalid hosts cannot be used for metahosts. They're included in
570 # the original query because they can be used by non-metahosts.
571 return False
572 return True
573
574
jamesren883492a2010-02-12 00:45:18 +0000575 def schedule_entry(self, queue_entry):
576 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000577 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000578
579 for scheduler in self._metahost_schedulers:
580 if scheduler.can_schedule_metahost(queue_entry):
581 scheduler.schedule_metahost(queue_entry, self)
582 return None
583
584 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000585
586
showard89f84db2009-03-12 20:39:13 +0000587 def find_eligible_atomic_group(self, queue_entry):
588 """
589 Given an atomic group host queue entry, locate an appropriate group
590 of hosts for the associated job to run on.
591
592 The caller is responsible for creating new HQEs for the additional
593 hosts returned in order to run the actual job on them.
594
595 @returns A list of Host instances in a ready state to satisfy this
596 atomic group scheduling. Hosts will all belong to the same
597 atomic group label as specified by the queue_entry.
598 An empty list will be returned if no suitable atomic
599 group could be found.
600
601 TODO(gps): what is responsible for kicking off any attempted repairs on
602 a group of hosts? not this function, but something needs to. We do
603 not communicate that reason for returning [] outside of here...
604 For now, we'll just be unschedulable if enough hosts within one group
605 enter Repair Failed state.
606 """
607 assert queue_entry.atomic_group_id is not None
608 job = queue_entry.job
609 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000610 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000611 if job.synch_count > atomic_group.max_number_of_machines:
612 # Such a Job and HostQueueEntry should never be possible to
613 # create using the frontend. Regardless, we can't process it.
614 # Abort it immediately and log an error on the scheduler.
615 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000616 logging.error(
617 'Error: job %d synch_count=%d > requested atomic_group %d '
618 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
619 job.id, job.synch_count, atomic_group.id,
620 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000621 return []
jamesren883492a2010-02-12 00:45:18 +0000622 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
623 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000624
625 # Look in each label associated with atomic_group until we find one with
626 # enough hosts to satisfy the job.
627 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000628 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000629 if queue_entry.meta_host is not None:
630 # If we have a metahost label, only allow its hosts.
631 group_hosts.intersection_update(hosts_in_label)
632 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000633 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000634 group_hosts, queue_entry)
635
636 # Job.synch_count is treated as "minimum synch count" when
637 # scheduling for an atomic group of hosts. The atomic group
638 # number of machines is the maximum to pick out of a single
639 # atomic group label for scheduling at one time.
640 min_hosts = job.synch_count
641 max_hosts = atomic_group.max_number_of_machines
642
showard54c1ea92009-05-20 00:32:58 +0000643 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000644 # Not enough eligible hosts in this atomic group label.
645 continue
646
showard54c1ea92009-05-20 00:32:58 +0000647 eligible_hosts_in_group = [self._hosts_available[id]
648 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000649 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000650 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000651
showard89f84db2009-03-12 20:39:13 +0000652 # Limit ourselves to scheduling the atomic group size.
653 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000654 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000655
656 # Remove the selected hosts from our cached internal state
657 # of available hosts in order to return the Host objects.
658 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000659 for host in eligible_hosts_in_group:
660 hosts_in_label.discard(host.id)
661 self._hosts_available.pop(host.id)
662 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000663 return host_list
664
665 return []
666
667
Eric Li861b2d52011-02-04 14:50:35 -0800668site_host_scheduler = utils.import_site_class(__file__,
669 "autotest_lib.scheduler.site_host_scheduler",
670 "site_host_scheduler", BaseHostScheduler)
671
672
673class HostScheduler(site_host_scheduler):
674 pass
675
676
showard170873e2009-01-07 00:22:26 +0000677class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000678 def __init__(self):
679 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000680 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000681 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000682 user_cleanup_time = scheduler_config.config.clean_interval
683 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
684 _db, user_cleanup_time)
685 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000686 self._host_agents = {}
687 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000688 self._tick_count = 0
689 self._last_garbage_stats_time = time.time()
690 self._seconds_between_garbage_stats = 60 * (
691 global_config.global_config.get_config_value(
692 scheduler_config.CONFIG_SECTION,
693 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000694
mbligh36768f02008-02-22 18:28:33 +0000695
showard915958d2009-04-22 21:00:58 +0000696 def initialize(self, recover_hosts=True):
697 self._periodic_cleanup.initialize()
698 self._24hr_upkeep.initialize()
699
jadmanski0afbb632008-06-06 21:10:57 +0000700 # always recover processes
701 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000702
jadmanski0afbb632008-06-06 21:10:57 +0000703 if recover_hosts:
704 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000705
jamesrenc44ae992010-02-19 00:12:54 +0000706 self._host_scheduler.recovery_on_startup()
707
mbligh36768f02008-02-22 18:28:33 +0000708
jadmanski0afbb632008-06-06 21:10:57 +0000709 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000710 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000711 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000712 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000713 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000714 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000715 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000716 self._schedule_running_host_queue_entries()
717 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000718 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000719 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000720 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000721 _drone_manager.execute_actions()
722 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000723 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000724 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000725
showard97aed502008-11-04 02:01:24 +0000726
mblighf3294cc2009-04-08 21:17:38 +0000727 def _run_cleanup(self):
728 self._periodic_cleanup.run_cleanup_maybe()
729 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000730
mbligh36768f02008-02-22 18:28:33 +0000731
showardf13a9e22009-12-18 22:54:09 +0000732 def _garbage_collection(self):
733 threshold_time = time.time() - self._seconds_between_garbage_stats
734 if threshold_time < self._last_garbage_stats_time:
735 # Don't generate these reports very often.
736 return
737
738 self._last_garbage_stats_time = time.time()
739 # Force a full level 0 collection (because we can, it doesn't hurt
740 # at this interval).
741 gc.collect()
742 logging.info('Logging garbage collector stats on tick %d.',
743 self._tick_count)
744 gc_stats._log_garbage_collector_stats()
745
746
showard170873e2009-01-07 00:22:26 +0000747 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
748 for object_id in object_ids:
749 agent_dict.setdefault(object_id, set()).add(agent)
750
751
752 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
753 for object_id in object_ids:
754 assert object_id in agent_dict
755 agent_dict[object_id].remove(agent)
756
757
showardd1195652009-12-08 22:21:02 +0000758 def add_agent_task(self, agent_task):
759 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000760 self._agents.append(agent)
761 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000762 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
763 self._register_agent_for_ids(self._queue_entry_agents,
764 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000765
showard170873e2009-01-07 00:22:26 +0000766
767 def get_agents_for_entry(self, queue_entry):
768 """
769 Find agents corresponding to the specified queue_entry.
770 """
showardd3dc1992009-04-22 21:01:40 +0000771 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000772
773
774 def host_has_agent(self, host):
775 """
776 Determine if there is currently an Agent present using this host.
777 """
778 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000779
780
jadmanski0afbb632008-06-06 21:10:57 +0000781 def remove_agent(self, agent):
782 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000783 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
784 agent)
785 self._unregister_agent_for_ids(self._queue_entry_agents,
786 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000787
788
showard8cc058f2009-09-08 16:26:33 +0000789 def _host_has_scheduled_special_task(self, host):
790 return bool(models.SpecialTask.objects.filter(host__id=host.id,
791 is_active=False,
792 is_complete=False))
793
794
jadmanski0afbb632008-06-06 21:10:57 +0000795 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000796 agent_tasks = self._create_recovery_agent_tasks()
797 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000798 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000799 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000800 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000801 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000802 self._reverify_remaining_hosts()
803 # reinitialize drones after killing orphaned processes, since they can
804 # leave around files when they die
805 _drone_manager.execute_actions()
806 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000807
showard170873e2009-01-07 00:22:26 +0000808
showardd1195652009-12-08 22:21:02 +0000809 def _create_recovery_agent_tasks(self):
810 return (self._get_queue_entry_agent_tasks()
811 + self._get_special_task_agent_tasks(is_active=True))
812
813
814 def _get_queue_entry_agent_tasks(self):
815 # host queue entry statuses handled directly by AgentTasks (Verifying is
816 # handled through SpecialTasks, so is not listed here)
817 statuses = (models.HostQueueEntry.Status.STARTING,
818 models.HostQueueEntry.Status.RUNNING,
819 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000820 models.HostQueueEntry.Status.PARSING,
821 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000822 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000823 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000824 where='status IN (%s)' % status_list)
825
826 agent_tasks = []
827 used_queue_entries = set()
828 for entry in queue_entries:
829 if self.get_agents_for_entry(entry):
830 # already being handled
831 continue
832 if entry in used_queue_entries:
833 # already picked up by a synchronous job
834 continue
835 agent_task = self._get_agent_task_for_queue_entry(entry)
836 agent_tasks.append(agent_task)
837 used_queue_entries.update(agent_task.queue_entries)
838 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000839
840
showardd1195652009-12-08 22:21:02 +0000841 def _get_special_task_agent_tasks(self, is_active=False):
842 special_tasks = models.SpecialTask.objects.filter(
843 is_active=is_active, is_complete=False)
844 return [self._get_agent_task_for_special_task(task)
845 for task in special_tasks]
846
847
848 def _get_agent_task_for_queue_entry(self, queue_entry):
849 """
850 Construct an AgentTask instance for the given active HostQueueEntry,
851 if one can currently run it.
852 @param queue_entry: a HostQueueEntry
853 @returns an AgentTask to run the queue entry
854 """
855 task_entries = queue_entry.job.get_group_entries(queue_entry)
856 self._check_for_duplicate_host_entries(task_entries)
857
858 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
859 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000860 if queue_entry.is_hostless():
861 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000862 return QueueTask(queue_entries=task_entries)
863 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
864 return GatherLogsTask(queue_entries=task_entries)
865 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
866 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000867 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
868 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000869
870 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000871 'invalid status %s: %s'
872 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000873
874
875 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000876 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
877 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000878 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000879 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000880 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000881 if using_host:
showardd1195652009-12-08 22:21:02 +0000882 self._assert_host_has_no_agent(task_entry)
883
884
885 def _assert_host_has_no_agent(self, entry):
886 """
887 @param entry: a HostQueueEntry or a SpecialTask
888 """
889 if self.host_has_agent(entry.host):
890 agent = tuple(self._host_agents.get(entry.host.id))[0]
891 raise SchedulerError(
892 'While scheduling %s, host %s already has a host agent %s'
893 % (entry, entry.host, agent.task))
894
895
896 def _get_agent_task_for_special_task(self, special_task):
897 """
898 Construct an AgentTask class to run the given SpecialTask and add it
899 to this dispatcher.
900 @param special_task: a models.SpecialTask instance
901 @returns an AgentTask to run this SpecialTask
902 """
903 self._assert_host_has_no_agent(special_task)
904
905 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
906 for agent_task_class in special_agent_task_classes:
907 if agent_task_class.TASK_TYPE == special_task.task:
908 return agent_task_class(task=special_task)
909
910 raise SchedulerError('No AgentTask class for task', str(special_task))
911
912
913 def _register_pidfiles(self, agent_tasks):
914 for agent_task in agent_tasks:
915 agent_task.register_necessary_pidfiles()
916
917
918 def _recover_tasks(self, agent_tasks):
919 orphans = _drone_manager.get_orphaned_autoserv_processes()
920
921 for agent_task in agent_tasks:
922 agent_task.recover()
923 if agent_task.monitor and agent_task.monitor.has_process():
924 orphans.discard(agent_task.monitor.get_process())
925 self.add_agent_task(agent_task)
926
927 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000928
929
showard8cc058f2009-09-08 16:26:33 +0000930 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000931 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
932 % status):
showard0db3d432009-10-12 20:29:15 +0000933 if entry.status == status and not self.get_agents_for_entry(entry):
934 # The status can change during iteration, e.g., if job.run()
935 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000936 yield entry
937
938
showard6878e8b2009-07-20 22:37:45 +0000939 def _check_for_remaining_orphan_processes(self, orphans):
940 if not orphans:
941 return
942 subject = 'Unrecovered orphan autoserv processes remain'
943 message = '\n'.join(str(process) for process in orphans)
944 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000945
946 die_on_orphans = global_config.global_config.get_config_value(
947 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
948
949 if die_on_orphans:
950 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000951
showard170873e2009-01-07 00:22:26 +0000952
showard8cc058f2009-09-08 16:26:33 +0000953 def _recover_pending_entries(self):
954 for entry in self._get_unassigned_entries(
955 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000956 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000957 entry.on_pending()
958
959
showardb8900452009-10-12 20:31:01 +0000960 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000961 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000962 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
963 unrecovered_hqes = []
964 for queue_entry in queue_entries:
965 special_tasks = models.SpecialTask.objects.filter(
966 task__in=(models.SpecialTask.Task.CLEANUP,
967 models.SpecialTask.Task.VERIFY),
968 queue_entry__id=queue_entry.id,
969 is_complete=False)
970 if special_tasks.count() == 0:
971 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000972
showardb8900452009-10-12 20:31:01 +0000973 if unrecovered_hqes:
974 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000975 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000976 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000977 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000978
979
showard65db3932009-10-28 19:54:35 +0000980 def _get_prioritized_special_tasks(self):
981 """
982 Returns all queued SpecialTasks prioritized for repair first, then
983 cleanup, then verify.
984 """
985 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
986 is_complete=False,
987 host__locked=False)
988 # exclude hosts with active queue entries unless the SpecialTask is for
989 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000990 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000991 queued_tasks, 'afe_host_queue_entries', 'host_id',
992 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000993 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000994 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000995 where=['(afe_host_queue_entries.id IS NULL OR '
996 'afe_host_queue_entries.id = '
997 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000998
showard65db3932009-10-28 19:54:35 +0000999 # reorder tasks by priority
1000 task_priority_order = [models.SpecialTask.Task.REPAIR,
1001 models.SpecialTask.Task.CLEANUP,
1002 models.SpecialTask.Task.VERIFY]
1003 def task_priority_key(task):
1004 return task_priority_order.index(task.task)
1005 return sorted(queued_tasks, key=task_priority_key)
1006
1007
showard65db3932009-10-28 19:54:35 +00001008 def _schedule_special_tasks(self):
1009 """
1010 Execute queued SpecialTasks that are ready to run on idle hosts.
1011 """
1012 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +00001013 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +00001014 continue
showardd1195652009-12-08 22:21:02 +00001015 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +00001016
1017
showard170873e2009-01-07 00:22:26 +00001018 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +00001019 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001020 # should never happen
showarded2afea2009-07-07 20:54:07 +00001021 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001022 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001023 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001024 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001025 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001026
1027
jadmanski0afbb632008-06-06 21:10:57 +00001028 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001029 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001030 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001031 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001032 if self.host_has_agent(host):
1033 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001034 continue
showard8cc058f2009-09-08 16:26:33 +00001035 if self._host_has_scheduled_special_task(host):
1036 # host will have a special task scheduled on the next cycle
1037 continue
showard170873e2009-01-07 00:22:26 +00001038 if print_message:
showardb18134f2009-03-20 20:52:18 +00001039 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001040 models.SpecialTask.objects.create(
1041 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001042 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001043
1044
jadmanski0afbb632008-06-06 21:10:57 +00001045 def _recover_hosts(self):
1046 # recover "Repair Failed" hosts
1047 message = 'Reverifying dead host %s'
1048 self._reverify_hosts_where("status = 'Repair Failed'",
1049 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001050
1051
showard04c82c52008-05-29 19:38:12 +00001052
showardb95b1bd2008-08-15 18:11:04 +00001053 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001054 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001055 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001056 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001057 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001058 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001059
1060
showard89f84db2009-03-12 20:39:13 +00001061 def _refresh_pending_queue_entries(self):
1062 """
1063 Lookup the pending HostQueueEntries and call our HostScheduler
1064 refresh() method given that list. Return the list.
1065
1066 @returns A list of pending HostQueueEntries sorted in priority order.
1067 """
showard63a34772008-08-18 19:32:50 +00001068 queue_entries = self._get_pending_queue_entries()
1069 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001070 return []
showardb95b1bd2008-08-15 18:11:04 +00001071
showard63a34772008-08-18 19:32:50 +00001072 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001073
showard89f84db2009-03-12 20:39:13 +00001074 return queue_entries
1075
1076
1077 def _schedule_atomic_group(self, queue_entry):
1078 """
1079 Schedule the given queue_entry on an atomic group of hosts.
1080
1081 Returns immediately if there are insufficient available hosts.
1082
1083 Creates new HostQueueEntries based off of queue_entry for the
1084 scheduled hosts and starts them all running.
1085 """
1086 # This is a virtual host queue entry representing an entire
1087 # atomic group, find a group and schedule their hosts.
1088 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1089 queue_entry)
1090 if not group_hosts:
1091 return
showardcbe6f942009-06-17 19:33:49 +00001092
1093 logging.info('Expanding atomic group entry %s with hosts %s',
1094 queue_entry,
1095 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001096
showard89f84db2009-03-12 20:39:13 +00001097 for assigned_host in group_hosts[1:]:
1098 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001099 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001100 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001101 new_hqe.set_host(assigned_host)
1102 self._run_queue_entry(new_hqe)
1103
1104 # The first assigned host uses the original HostQueueEntry
1105 queue_entry.set_host(group_hosts[0])
1106 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001107
1108
showarda9545c02009-12-18 22:44:26 +00001109 def _schedule_hostless_job(self, queue_entry):
1110 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001111 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001112
1113
showard89f84db2009-03-12 20:39:13 +00001114 def _schedule_new_jobs(self):
1115 queue_entries = self._refresh_pending_queue_entries()
1116 if not queue_entries:
1117 return
1118
showard63a34772008-08-18 19:32:50 +00001119 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001120 is_unassigned_atomic_group = (
1121 queue_entry.atomic_group_id is not None
1122 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001123
1124 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001125 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001126 elif is_unassigned_atomic_group:
1127 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001128 else:
jamesren883492a2010-02-12 00:45:18 +00001129 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001130 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001131 assert assigned_host.id == queue_entry.host_id
1132 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001133
1134
showard8cc058f2009-09-08 16:26:33 +00001135 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001136 for agent_task in self._get_queue_entry_agent_tasks():
1137 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001138
1139
1140 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001141 for entry in scheduler_models.HostQueueEntry.fetch(
1142 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001143 task = entry.job.schedule_delayed_callback_task(entry)
1144 if task:
showardd1195652009-12-08 22:21:02 +00001145 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001146
1147
jamesren883492a2010-02-12 00:45:18 +00001148 def _run_queue_entry(self, queue_entry):
1149 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +00001153 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +00001154 for entry in scheduler_models.HostQueueEntry.fetch(
1155 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001156 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001157 for agent in self.get_agents_for_entry(entry):
1158 agent.abort()
1159 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +00001160 jobs_to_stop.add(entry.job)
1161 for job in jobs_to_stop:
1162 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +00001163
1164
showard324bf812009-01-20 23:23:38 +00001165 def _can_start_agent(self, agent, num_started_this_cycle,
1166 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001167 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001168 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001169 return True
1170 # don't allow any nonzero-process agents to run after we've reached a
1171 # limit (this avoids starvation of many-process agents)
1172 if have_reached_limit:
1173 return False
1174 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001175 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001176 agent.task.owner_username,
1177 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001178 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001179 return False
1180 # if a single agent exceeds the per-cycle throttling, still allow it to
1181 # run when it's the first agent in the cycle
1182 if num_started_this_cycle == 0:
1183 return True
1184 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001185 if (num_started_this_cycle + agent.task.num_processes >
1186 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001187 return False
1188 return True
1189
1190
jadmanski0afbb632008-06-06 21:10:57 +00001191 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001192 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001193 have_reached_limit = False
1194 # iterate over copy, so we can remove agents during iteration
1195 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001196 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001197 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001198 have_reached_limit):
1199 have_reached_limit = True
1200 continue
showardd1195652009-12-08 22:21:02 +00001201 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001202 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001203 if agent.is_done():
1204 logging.info("agent finished")
1205 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001206 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001207 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001208
1209
showard29f7cd22009-04-29 21:16:24 +00001210 def _process_recurring_runs(self):
1211 recurring_runs = models.RecurringRun.objects.filter(
1212 start_date__lte=datetime.datetime.now())
1213 for rrun in recurring_runs:
1214 # Create job from template
1215 job = rrun.job
1216 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001217 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001218
1219 host_objects = info['hosts']
1220 one_time_hosts = info['one_time_hosts']
1221 metahost_objects = info['meta_hosts']
1222 dependencies = info['dependencies']
1223 atomic_group = info['atomic_group']
1224
1225 for host in one_time_hosts or []:
1226 this_host = models.Host.create_one_time_host(host.hostname)
1227 host_objects.append(this_host)
1228
1229 try:
1230 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001231 options=options,
showard29f7cd22009-04-29 21:16:24 +00001232 host_objects=host_objects,
1233 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001234 atomic_group=atomic_group)
1235
1236 except Exception, ex:
1237 logging.exception(ex)
1238 #TODO send email
1239
1240 if rrun.loop_count == 1:
1241 rrun.delete()
1242 else:
1243 if rrun.loop_count != 0: # if not infinite loop
1244 # calculate new start_date
1245 difference = datetime.timedelta(seconds=rrun.loop_period)
1246 rrun.start_date = rrun.start_date + difference
1247 rrun.loop_count -= 1
1248 rrun.save()
1249
1250
showard170873e2009-01-07 00:22:26 +00001251class PidfileRunMonitor(object):
1252 """
1253 Client must call either run() to start a new process or
1254 attach_to_existing_process().
1255 """
mbligh36768f02008-02-22 18:28:33 +00001256
showard170873e2009-01-07 00:22:26 +00001257 class _PidfileException(Exception):
1258 """
1259 Raised when there's some unexpected behavior with the pid file, but only
1260 used internally (never allowed to escape this class).
1261 """
mbligh36768f02008-02-22 18:28:33 +00001262
1263
showard170873e2009-01-07 00:22:26 +00001264 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001265 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001266 self._start_time = None
1267 self.pidfile_id = None
1268 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001269
1270
showard170873e2009-01-07 00:22:26 +00001271 def _add_nice_command(self, command, nice_level):
1272 if not nice_level:
1273 return command
1274 return ['nice', '-n', str(nice_level)] + command
1275
1276
1277 def _set_start_time(self):
1278 self._start_time = time.time()
1279
1280
showard418785b2009-11-23 20:19:59 +00001281 def run(self, command, working_directory, num_processes, nice_level=None,
1282 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001283 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001284 assert command is not None
1285 if nice_level is not None:
1286 command = ['nice', '-n', str(nice_level)] + command
1287 self._set_start_time()
1288 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001289 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001290 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001291 paired_with_pidfile=paired_with_pidfile, username=username,
1292 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001293
1294
showarded2afea2009-07-07 20:54:07 +00001295 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001296 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001297 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001298 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001299 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001300 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001301 if num_processes is not None:
1302 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001303
1304
jadmanski0afbb632008-06-06 21:10:57 +00001305 def kill(self):
showard170873e2009-01-07 00:22:26 +00001306 if self.has_process():
1307 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001308
mbligh36768f02008-02-22 18:28:33 +00001309
showard170873e2009-01-07 00:22:26 +00001310 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001311 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001312 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001313
1314
showard170873e2009-01-07 00:22:26 +00001315 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001316 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001317 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001318 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001319
1320
showard170873e2009-01-07 00:22:26 +00001321 def _read_pidfile(self, use_second_read=False):
1322 assert self.pidfile_id is not None, (
1323 'You must call run() or attach_to_existing_process()')
1324 contents = _drone_manager.get_pidfile_contents(
1325 self.pidfile_id, use_second_read=use_second_read)
1326 if contents.is_invalid():
1327 self._state = drone_manager.PidfileContents()
1328 raise self._PidfileException(contents)
1329 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001330
1331
showard21baa452008-10-21 00:08:39 +00001332 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001333 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1334 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001335 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001336 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001337
1338
1339 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001340 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001341 return
mblighbb421852008-03-11 22:36:16 +00001342
showard21baa452008-10-21 00:08:39 +00001343 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001344
showard170873e2009-01-07 00:22:26 +00001345 if self._state.process is None:
1346 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001347 return
mbligh90a549d2008-03-25 23:52:34 +00001348
showard21baa452008-10-21 00:08:39 +00001349 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001350 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001351 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001352 return
mbligh90a549d2008-03-25 23:52:34 +00001353
showard170873e2009-01-07 00:22:26 +00001354 # pid but no running process - maybe process *just* exited
1355 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001356 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001357 # autoserv exited without writing an exit code
1358 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001359 self._handle_pidfile_error(
1360 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001361
showard21baa452008-10-21 00:08:39 +00001362
1363 def _get_pidfile_info(self):
1364 """\
1365 After completion, self._state will contain:
1366 pid=None, exit_status=None if autoserv has not yet run
1367 pid!=None, exit_status=None if autoserv is running
1368 pid!=None, exit_status!=None if autoserv has completed
1369 """
1370 try:
1371 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001372 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001373 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001374
1375
showard170873e2009-01-07 00:22:26 +00001376 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001377 """\
1378 Called when no pidfile is found or no pid is in the pidfile.
1379 """
showard170873e2009-01-07 00:22:26 +00001380 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001381 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001382 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001383 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001384 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001385
1386
showard35162b02009-03-03 02:17:30 +00001387 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001388 """\
1389 Called when autoserv has exited without writing an exit status,
1390 or we've timed out waiting for autoserv to write a pid to the
1391 pidfile. In either case, we just return failure and the caller
1392 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001393
showard170873e2009-01-07 00:22:26 +00001394 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001395 """
1396 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001397 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001398 self._state.exit_status = 1
1399 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001400
1401
jadmanski0afbb632008-06-06 21:10:57 +00001402 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001403 self._get_pidfile_info()
1404 return self._state.exit_status
1405
1406
1407 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001408 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001409 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001410 if self._state.num_tests_failed is None:
1411 return -1
showard21baa452008-10-21 00:08:39 +00001412 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001413
1414
showardcdaeae82009-08-31 18:32:48 +00001415 def try_copy_results_on_drone(self, **kwargs):
1416 if self.has_process():
1417 # copy results logs into the normal place for job results
1418 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1419
1420
1421 def try_copy_to_results_repository(self, source, **kwargs):
1422 if self.has_process():
1423 _drone_manager.copy_to_results_repository(self.get_process(),
1424 source, **kwargs)
1425
1426
mbligh36768f02008-02-22 18:28:33 +00001427class Agent(object):
showard77182562009-06-10 00:16:05 +00001428 """
showard8cc058f2009-09-08 16:26:33 +00001429 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001430
1431 The following methods are required on all task objects:
1432 poll() - Called periodically to let the task check its status and
1433 update its internal state. If the task succeeded.
1434 is_done() - Returns True if the task is finished.
1435 abort() - Called when an abort has been requested. The task must
1436 set its aborted attribute to True if it actually aborted.
1437
1438 The following attributes are required on all task objects:
1439 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001440 success - bool, True if this task succeeded.
1441 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1442 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001443 """
1444
1445
showard418785b2009-11-23 20:19:59 +00001446 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001447 """
showard8cc058f2009-09-08 16:26:33 +00001448 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001449 """
showard8cc058f2009-09-08 16:26:33 +00001450 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001451
showard77182562009-06-10 00:16:05 +00001452 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001453 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001454
showard8cc058f2009-09-08 16:26:33 +00001455 self.queue_entry_ids = task.queue_entry_ids
1456 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001457
showard8cc058f2009-09-08 16:26:33 +00001458 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001459 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001460
1461
jadmanski0afbb632008-06-06 21:10:57 +00001462 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001463 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001464 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001465 self.task.poll()
1466 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001467 self.finished = True
showardec113162008-05-08 00:52:49 +00001468
1469
jadmanski0afbb632008-06-06 21:10:57 +00001470 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001471 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001472
1473
showardd3dc1992009-04-22 21:01:40 +00001474 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001475 if self.task:
1476 self.task.abort()
1477 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001478 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001479 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001480
showardd3dc1992009-04-22 21:01:40 +00001481
mbligh36768f02008-02-22 18:28:33 +00001482class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001483 class _NullMonitor(object):
1484 pidfile_id = None
1485
1486 def has_process(self):
1487 return True
1488
1489
1490 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001491 """
showardd1195652009-12-08 22:21:02 +00001492 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001493 """
jadmanski0afbb632008-06-06 21:10:57 +00001494 self.done = False
showardd1195652009-12-08 22:21:02 +00001495 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001496 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001497 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001498 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001499 self.queue_entry_ids = []
1500 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001501 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001502
1503
1504 def _set_ids(self, host=None, queue_entries=None):
1505 if queue_entries and queue_entries != [None]:
1506 self.host_ids = [entry.host.id for entry in queue_entries]
1507 self.queue_entry_ids = [entry.id for entry in queue_entries]
1508 else:
1509 assert host
1510 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def poll(self):
showard08a36412009-05-05 01:01:13 +00001514 if not self.started:
1515 self.start()
showardd1195652009-12-08 22:21:02 +00001516 if not self.done:
1517 self.tick()
showard08a36412009-05-05 01:01:13 +00001518
1519
1520 def tick(self):
showardd1195652009-12-08 22:21:02 +00001521 assert self.monitor
1522 exit_code = self.monitor.exit_code()
1523 if exit_code is None:
1524 return
mbligh36768f02008-02-22 18:28:33 +00001525
showardd1195652009-12-08 22:21:02 +00001526 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001527 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001528
1529
jadmanski0afbb632008-06-06 21:10:57 +00001530 def is_done(self):
1531 return self.done
mbligh36768f02008-02-22 18:28:33 +00001532
1533
jadmanski0afbb632008-06-06 21:10:57 +00001534 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001535 if self.done:
showardd1195652009-12-08 22:21:02 +00001536 assert self.started
showard08a36412009-05-05 01:01:13 +00001537 return
showardd1195652009-12-08 22:21:02 +00001538 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001539 self.done = True
1540 self.success = success
1541 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001542
1543
jadmanski0afbb632008-06-06 21:10:57 +00001544 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001545 """
1546 To be overridden.
1547 """
showarded2afea2009-07-07 20:54:07 +00001548 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001549 self.register_necessary_pidfiles()
1550
1551
1552 def _log_file(self):
1553 if not self._log_file_name:
1554 return None
1555 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001556
mbligh36768f02008-02-22 18:28:33 +00001557
jadmanski0afbb632008-06-06 21:10:57 +00001558 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001559 log_file = self._log_file()
1560 if self.monitor and log_file:
1561 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001562
1563
jadmanski0afbb632008-06-06 21:10:57 +00001564 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001565 """
1566 To be overridden.
1567 """
jadmanski0afbb632008-06-06 21:10:57 +00001568 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001569 logging.info("%s finished with success=%s", type(self).__name__,
1570 self.success)
1571
mbligh36768f02008-02-22 18:28:33 +00001572
1573
jadmanski0afbb632008-06-06 21:10:57 +00001574 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001575 if not self.started:
1576 self.prolog()
1577 self.run()
1578
1579 self.started = True
1580
1581
1582 def abort(self):
1583 if self.monitor:
1584 self.monitor.kill()
1585 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001586 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001587 self.cleanup()
1588
1589
showarded2afea2009-07-07 20:54:07 +00001590 def _get_consistent_execution_path(self, execution_entries):
1591 first_execution_path = execution_entries[0].execution_path()
1592 for execution_entry in execution_entries[1:]:
1593 assert execution_entry.execution_path() == first_execution_path, (
1594 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1595 execution_entry,
1596 first_execution_path,
1597 execution_entries[0]))
1598 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001599
1600
showarded2afea2009-07-07 20:54:07 +00001601 def _copy_results(self, execution_entries, use_monitor=None):
1602 """
1603 @param execution_entries: list of objects with execution_path() method
1604 """
showard6d1c1432009-08-20 23:30:39 +00001605 if use_monitor is not None and not use_monitor.has_process():
1606 return
1607
showarded2afea2009-07-07 20:54:07 +00001608 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001609 if use_monitor is None:
1610 assert self.monitor
1611 use_monitor = self.monitor
1612 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001613 execution_path = self._get_consistent_execution_path(execution_entries)
1614 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001615 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001616
showarda1e74b32009-05-12 17:32:04 +00001617
1618 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001619 for queue_entry in queue_entries:
1620 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001621
1622
mbligh4608b002010-01-05 18:22:35 +00001623 def _archive_results(self, queue_entries):
1624 for queue_entry in queue_entries:
1625 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001626
1627
showardd1195652009-12-08 22:21:02 +00001628 def _command_line(self):
1629 """
1630 Return the command line to run. Must be overridden.
1631 """
1632 raise NotImplementedError
1633
1634
1635 @property
1636 def num_processes(self):
1637 """
1638 Return the number of processes forked by this AgentTask's process. It
1639 may only be approximate. To be overridden if necessary.
1640 """
1641 return 1
1642
1643
1644 def _paired_with_monitor(self):
1645 """
1646 If this AgentTask's process must run on the same machine as some
1647 previous process, this method should be overridden to return a
1648 PidfileRunMonitor for that process.
1649 """
1650 return self._NullMonitor()
1651
1652
1653 @property
1654 def owner_username(self):
1655 """
1656 Return login of user responsible for this task. May be None. Must be
1657 overridden.
1658 """
1659 raise NotImplementedError
1660
1661
1662 def _working_directory(self):
1663 """
1664 Return the directory where this AgentTask's process executes. Must be
1665 overridden.
1666 """
1667 raise NotImplementedError
1668
1669
1670 def _pidfile_name(self):
1671 """
1672 Return the name of the pidfile this AgentTask's process uses. To be
1673 overridden if necessary.
1674 """
jamesrenc44ae992010-02-19 00:12:54 +00001675 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001676
1677
1678 def _check_paired_results_exist(self):
1679 if not self._paired_with_monitor().has_process():
1680 email_manager.manager.enqueue_notify_email(
1681 'No paired results in task',
1682 'No paired results in task %s at %s'
1683 % (self, self._paired_with_monitor().pidfile_id))
1684 self.finished(False)
1685 return False
1686 return True
1687
1688
1689 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001690 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001691 self.monitor = PidfileRunMonitor()
1692
1693
1694 def run(self):
1695 if not self._check_paired_results_exist():
1696 return
1697
1698 self._create_monitor()
1699 self.monitor.run(
1700 self._command_line(), self._working_directory(),
1701 num_processes=self.num_processes,
1702 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1703 pidfile_name=self._pidfile_name(),
1704 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001705 username=self.owner_username,
1706 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1707
1708
1709 def get_drone_hostnames_allowed(self):
1710 if not models.DroneSet.drone_sets_enabled():
1711 return None
1712
1713 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1714 if not hqes:
1715 # Only special tasks could be missing host queue entries
1716 assert isinstance(self, SpecialAgentTask)
1717 return self._user_or_global_default_drone_set(
1718 self.task, self.task.requested_by)
1719
1720 job_ids = hqes.values_list('job', flat=True).distinct()
1721 assert job_ids.count() == 1, ("AgentTask's queue entries "
1722 "span multiple jobs")
1723
1724 job = models.Job.objects.get(id=job_ids[0])
1725 drone_set = job.drone_set
1726 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001727 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001728
1729 return drone_set.get_drone_hostnames()
1730
1731
1732 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1733 """
1734 Returns the user's default drone set, if present.
1735
1736 Otherwise, returns the global default drone set.
1737 """
1738 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1739 if not user:
1740 logging.warn('%s had no owner; using default drone set',
1741 obj_with_owner)
1742 return default_hostnames
1743 if not user.drone_set:
1744 logging.warn('User %s has no default drone set, using global '
1745 'default', user.login)
1746 return default_hostnames
1747 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001748
1749
1750 def register_necessary_pidfiles(self):
1751 pidfile_id = _drone_manager.get_pidfile_id_from(
1752 self._working_directory(), self._pidfile_name())
1753 _drone_manager.register_pidfile(pidfile_id)
1754
1755 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1756 if paired_pidfile_id:
1757 _drone_manager.register_pidfile(paired_pidfile_id)
1758
1759
1760 def recover(self):
1761 if not self._check_paired_results_exist():
1762 return
1763
1764 self._create_monitor()
1765 self.monitor.attach_to_existing_process(
1766 self._working_directory(), pidfile_name=self._pidfile_name(),
1767 num_processes=self.num_processes)
1768 if not self.monitor.has_process():
1769 # no process to recover; wait to be started normally
1770 self.monitor = None
1771 return
1772
1773 self.started = True
1774 logging.info('Recovering process %s for %s at %s'
1775 % (self.monitor.get_process(), type(self).__name__,
1776 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001777
1778
mbligh4608b002010-01-05 18:22:35 +00001779 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1780 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001781 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001782 for entry in queue_entries:
1783 if entry.status not in allowed_hqe_statuses:
jamesrenb8f3f352010-06-10 00:44:06 +00001784 raise SchedulerError('%s attempting to start '
mbligh4608b002010-01-05 18:22:35 +00001785 'entry with invalid status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001786 % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001787 invalid_host_status = (
1788 allowed_host_statuses is not None
1789 and entry.host.status not in allowed_host_statuses)
1790 if invalid_host_status:
jamesrenb8f3f352010-06-10 00:44:06 +00001791 raise SchedulerError('%s attempting to start on queue '
mbligh4608b002010-01-05 18:22:35 +00001792 'entry with invalid host status %s: %s'
jamesrenb8f3f352010-06-10 00:44:06 +00001793 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001794
1795
showardd9205182009-04-27 20:09:55 +00001796class TaskWithJobKeyvals(object):
1797 """AgentTask mixin providing functionality to help with job keyval files."""
1798 _KEYVAL_FILE = 'keyval'
1799 def _format_keyval(self, key, value):
1800 return '%s=%s' % (key, value)
1801
1802
1803 def _keyval_path(self):
1804 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001805 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001806
1807
1808 def _write_keyval_after_job(self, field, value):
1809 assert self.monitor
1810 if not self.monitor.has_process():
1811 return
1812 _drone_manager.write_lines_to_file(
1813 self._keyval_path(), [self._format_keyval(field, value)],
1814 paired_with_process=self.monitor.get_process())
1815
1816
1817 def _job_queued_keyval(self, job):
1818 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1819
1820
1821 def _write_job_finished(self):
1822 self._write_keyval_after_job("job_finished", int(time.time()))
1823
1824
showarddb502762009-09-09 15:31:20 +00001825 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1826 keyval_contents = '\n'.join(self._format_keyval(key, value)
1827 for key, value in keyval_dict.iteritems())
1828 # always end with a newline to allow additional keyvals to be written
1829 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001830 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001831 keyval_contents,
1832 file_path=keyval_path)
1833
1834
1835 def _write_keyvals_before_job(self, keyval_dict):
1836 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1837
1838
1839 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001840 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001841 host.hostname)
1842 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001843 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001844 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1845 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1846
1847
showard8cc058f2009-09-08 16:26:33 +00001848class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001849 """
1850 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1851 """
1852
1853 TASK_TYPE = None
1854 host = None
1855 queue_entry = None
1856
showardd1195652009-12-08 22:21:02 +00001857 def __init__(self, task, extra_command_args):
1858 super(SpecialAgentTask, self).__init__()
1859
lmrb7c5d272010-04-16 06:34:04 +00001860 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001861
jamesrenc44ae992010-02-19 00:12:54 +00001862 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001863 self.queue_entry = None
1864 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001865 self.queue_entry = scheduler_models.HostQueueEntry(
1866 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001867
showarded2afea2009-07-07 20:54:07 +00001868 self.task = task
1869 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001870
1871
showard8cc058f2009-09-08 16:26:33 +00001872 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001873 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1874
1875
1876 def _command_line(self):
1877 return _autoserv_command_line(self.host.hostname,
1878 self._extra_command_args,
1879 queue_entry=self.queue_entry)
1880
1881
1882 def _working_directory(self):
1883 return self.task.execution_path()
1884
1885
1886 @property
1887 def owner_username(self):
1888 if self.task.requested_by:
1889 return self.task.requested_by.login
1890 return None
showard8cc058f2009-09-08 16:26:33 +00001891
1892
showarded2afea2009-07-07 20:54:07 +00001893 def prolog(self):
1894 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001895 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001896 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001897
1898
showardde634ee2009-01-30 01:44:24 +00001899 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001900 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001901
showard2fe3f1d2009-07-06 20:19:11 +00001902 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001903 return # don't fail metahost entries, they'll be reassigned
1904
showard2fe3f1d2009-07-06 20:19:11 +00001905 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001906 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001907 return # entry has been aborted
1908
showard2fe3f1d2009-07-06 20:19:11 +00001909 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001910 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001911 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001912 self._write_keyval_after_job(queued_key, queued_time)
1913 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001914
showard8cc058f2009-09-08 16:26:33 +00001915 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001916 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001917 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001918 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001919
showard8cc058f2009-09-08 16:26:33 +00001920 pidfile_id = _drone_manager.get_pidfile_id_from(
1921 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001922 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001923 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001924
1925 if self.queue_entry.job.parse_failed_repair:
1926 self._parse_results([self.queue_entry])
1927 else:
1928 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001929
1930
1931 def cleanup(self):
1932 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001933
1934 # We will consider an aborted task to be "Failed"
1935 self.task.finish(bool(self.success))
1936
showardf85a0b72009-10-07 20:48:45 +00001937 if self.monitor:
1938 if self.monitor.has_process():
1939 self._copy_results([self.task])
1940 if self.monitor.pidfile_id is not None:
1941 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001942
1943
1944class RepairTask(SpecialAgentTask):
1945 TASK_TYPE = models.SpecialTask.Task.REPAIR
1946
1947
showardd1195652009-12-08 22:21:02 +00001948 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001949 """\
1950 queue_entry: queue entry to mark failed if this repair fails.
1951 """
1952 protection = host_protections.Protection.get_string(
1953 task.host.protection)
1954 # normalize the protection name
1955 protection = host_protections.Protection.get_attr_name(protection)
1956
1957 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001958 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001959
1960 # *don't* include the queue entry in IDs -- if the queue entry is
1961 # aborted, we want to leave the repair task running
1962 self._set_ids(host=self.host)
1963
1964
1965 def prolog(self):
1966 super(RepairTask, self).prolog()
1967 logging.info("repair_task starting")
1968 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001969
1970
jadmanski0afbb632008-06-06 21:10:57 +00001971 def epilog(self):
1972 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001973
jadmanski0afbb632008-06-06 21:10:57 +00001974 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001975 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001976 else:
showard8cc058f2009-09-08 16:26:33 +00001977 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001978 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001979 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001980
1981
showarded2afea2009-07-07 20:54:07 +00001982class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001983 def _copy_to_results_repository(self):
1984 if not self.queue_entry or self.queue_entry.meta_host:
1985 return
1986
1987 self.queue_entry.set_execution_subdir()
1988 log_name = os.path.basename(self.task.execution_path())
1989 source = os.path.join(self.task.execution_path(), 'debug',
1990 'autoserv.DEBUG')
1991 destination = os.path.join(
1992 self.queue_entry.execution_path(), log_name)
1993
1994 self.monitor.try_copy_to_results_repository(
1995 source, destination_path=destination)
1996
1997
showard170873e2009-01-07 00:22:26 +00001998 def epilog(self):
1999 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00002000
showard775300b2009-09-09 15:30:50 +00002001 if self.success:
2002 return
showard8fe93b52008-11-18 17:53:22 +00002003
showard775300b2009-09-09 15:30:50 +00002004 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00002005
showard775300b2009-09-09 15:30:50 +00002006 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00002007 # effectively ignore failure for these hosts
2008 self.success = True
showard775300b2009-09-09 15:30:50 +00002009 return
2010
2011 if self.queue_entry:
2012 self.queue_entry.requeue()
2013
2014 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00002015 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00002016 queue_entry__id=self.queue_entry.id):
2017 self.host.set_status(models.Host.Status.REPAIR_FAILED)
2018 self._fail_queue_entry()
2019 return
2020
showard9bb960b2009-11-19 01:02:11 +00002021 queue_entry = models.HostQueueEntry.objects.get(
2022 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002023 else:
2024 queue_entry = None
2025
2026 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002027 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002028 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002029 queue_entry=queue_entry,
2030 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002031
showard8fe93b52008-11-18 17:53:22 +00002032
2033class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002034 TASK_TYPE = models.SpecialTask.Task.VERIFY
2035
2036
showardd1195652009-12-08 22:21:02 +00002037 def __init__(self, task):
2038 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002039 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002040
2041
jadmanski0afbb632008-06-06 21:10:57 +00002042 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002043 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002044
showardb18134f2009-03-20 20:52:18 +00002045 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002046 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002047 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2048 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002049
jamesren42318f72010-05-10 23:40:59 +00002050 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002051 # and there's no need to keep records of other requests.
2052 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002053 host__id=self.host.id,
2054 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00002055 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00002056 queued_verifies = queued_verifies.exclude(id=self.task.id)
2057 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002058
mbligh36768f02008-02-22 18:28:33 +00002059
jadmanski0afbb632008-06-06 21:10:57 +00002060 def epilog(self):
2061 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002062 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002063 if self.queue_entry:
2064 self.queue_entry.on_pending()
2065 else:
2066 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002067
2068
mbligh4608b002010-01-05 18:22:35 +00002069class CleanupTask(PreJobTask):
2070 # note this can also run post-job, but when it does, it's running standalone
2071 # against the host (not related to the job), so it's not considered a
2072 # PostJobTask
2073
2074 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2075
2076
2077 def __init__(self, task, recover_run_monitor=None):
2078 super(CleanupTask, self).__init__(task, ['--cleanup'])
2079 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2080
2081
2082 def prolog(self):
2083 super(CleanupTask, self).prolog()
2084 logging.info("starting cleanup task for host: %s", self.host.hostname)
2085 self.host.set_status(models.Host.Status.CLEANING)
2086 if self.queue_entry:
2087 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2088
2089
2090 def _finish_epilog(self):
2091 if not self.queue_entry or not self.success:
2092 return
2093
2094 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2095 should_run_verify = (
2096 self.queue_entry.job.run_verify
2097 and self.host.protection != do_not_verify_protection)
2098 if should_run_verify:
2099 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2100 models.SpecialTask.objects.create(
2101 host=models.Host.objects.get(id=self.host.id),
2102 queue_entry=entry,
2103 task=models.SpecialTask.Task.VERIFY)
2104 else:
2105 self.queue_entry.on_pending()
2106
2107
2108 def epilog(self):
2109 super(CleanupTask, self).epilog()
2110
2111 if self.success:
2112 self.host.update_field('dirty', 0)
2113 self.host.set_status(models.Host.Status.READY)
2114
2115 self._finish_epilog()
2116
2117
showarda9545c02009-12-18 22:44:26 +00002118class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2119 """
2120 Common functionality for QueueTask and HostlessQueueTask
2121 """
2122 def __init__(self, queue_entries):
2123 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002124 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002125 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002126
2127
showard73ec0442009-02-07 02:05:20 +00002128 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002129 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002130
2131
jamesrenc44ae992010-02-19 00:12:54 +00002132 def _write_control_file(self, execution_path):
2133 control_path = _drone_manager.attach_file_to_execution(
2134 execution_path, self.job.control_file)
2135 return control_path
2136
2137
showardd1195652009-12-08 22:21:02 +00002138 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002139 execution_path = self.queue_entries[0].execution_path()
2140 control_path = self._write_control_file(execution_path)
2141 hostnames = ','.join(entry.host.hostname
2142 for entry in self.queue_entries
2143 if not entry.is_hostless())
2144
2145 execution_tag = self.queue_entries[0].execution_tag()
2146 params = _autoserv_command_line(
2147 hostnames,
2148 ['-P', execution_tag, '-n',
2149 _drone_manager.absolute_path(control_path)],
2150 job=self.job, verbose=False)
2151
2152 if not self.job.is_server_job():
2153 params.append('-c')
2154
2155 return params
showardd1195652009-12-08 22:21:02 +00002156
2157
2158 @property
2159 def num_processes(self):
2160 return len(self.queue_entries)
2161
2162
2163 @property
2164 def owner_username(self):
2165 return self.job.owner
2166
2167
2168 def _working_directory(self):
2169 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002170
2171
jadmanski0afbb632008-06-06 21:10:57 +00002172 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002173 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002174 keyval_dict = self.job.keyval_dict()
2175 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002176 group_name = self.queue_entries[0].get_group_name()
2177 if group_name:
2178 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002179 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002180 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002181 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002182 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002183
2184
showard35162b02009-03-03 02:17:30 +00002185 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002186 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002187 _drone_manager.write_lines_to_file(error_file_path,
2188 [_LOST_PROCESS_ERROR])
2189
2190
showardd3dc1992009-04-22 21:01:40 +00002191 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002192 if not self.monitor:
2193 return
2194
showardd9205182009-04-27 20:09:55 +00002195 self._write_job_finished()
2196
showard35162b02009-03-03 02:17:30 +00002197 if self.monitor.lost_process:
2198 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002199
jadmanskif7fa2cc2008-10-01 14:13:23 +00002200
showardcbd74612008-11-19 21:42:02 +00002201 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002202 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002203 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002204 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002205 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002206
2207
jadmanskif7fa2cc2008-10-01 14:13:23 +00002208 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002209 if not self.monitor or not self.monitor.has_process():
2210 return
2211
jadmanskif7fa2cc2008-10-01 14:13:23 +00002212 # build up sets of all the aborted_by and aborted_on values
2213 aborted_by, aborted_on = set(), set()
2214 for queue_entry in self.queue_entries:
2215 if queue_entry.aborted_by:
2216 aborted_by.add(queue_entry.aborted_by)
2217 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2218 aborted_on.add(t)
2219
2220 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002221 # TODO(showard): this conditional is now obsolete, we just need to leave
2222 # it in temporarily for backwards compatibility over upgrades. delete
2223 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002224 assert len(aborted_by) <= 1
2225 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002226 aborted_by_value = aborted_by.pop()
2227 aborted_on_value = max(aborted_on)
2228 else:
2229 aborted_by_value = 'autotest_system'
2230 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002231
showarda0382352009-02-11 23:36:43 +00002232 self._write_keyval_after_job("aborted_by", aborted_by_value)
2233 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002234
showardcbd74612008-11-19 21:42:02 +00002235 aborted_on_string = str(datetime.datetime.fromtimestamp(
2236 aborted_on_value))
2237 self._write_status_comment('Job aborted by %s on %s' %
2238 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002239
2240
jadmanski0afbb632008-06-06 21:10:57 +00002241 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002242 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002243 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002244 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002245
2246
jadmanski0afbb632008-06-06 21:10:57 +00002247 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002248 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002249 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002250
2251
2252class QueueTask(AbstractQueueTask):
2253 def __init__(self, queue_entries):
2254 super(QueueTask, self).__init__(queue_entries)
2255 self._set_ids(queue_entries=queue_entries)
2256
2257
2258 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002259 self._check_queue_entry_statuses(
2260 self.queue_entries,
2261 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2262 models.HostQueueEntry.Status.RUNNING),
2263 allowed_host_statuses=(models.Host.Status.PENDING,
2264 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002265
2266 super(QueueTask, self).prolog()
2267
2268 for queue_entry in self.queue_entries:
2269 self._write_host_keyvals(queue_entry.host)
2270 queue_entry.host.set_status(models.Host.Status.RUNNING)
2271 queue_entry.host.update_field('dirty', 1)
2272 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2273 # TODO(gps): Remove this if nothing needs it anymore.
2274 # A potential user is: tko/parser
2275 self.job.write_to_machines_file(self.queue_entries[0])
2276
2277
2278 def _finish_task(self):
2279 super(QueueTask, self)._finish_task()
2280
2281 for queue_entry in self.queue_entries:
2282 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00002283 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00002284
2285
mbligh4608b002010-01-05 18:22:35 +00002286class HostlessQueueTask(AbstractQueueTask):
2287 def __init__(self, queue_entry):
2288 super(HostlessQueueTask, self).__init__([queue_entry])
2289 self.queue_entry_ids = [queue_entry.id]
2290
2291
2292 def prolog(self):
2293 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2294 super(HostlessQueueTask, self).prolog()
2295
2296
mbligh4608b002010-01-05 18:22:35 +00002297 def _finish_task(self):
2298 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002299 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002300
2301
showardd3dc1992009-04-22 21:01:40 +00002302class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002303 def __init__(self, queue_entries, log_file_name):
2304 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002305
showardd1195652009-12-08 22:21:02 +00002306 self.queue_entries = queue_entries
2307
showardd3dc1992009-04-22 21:01:40 +00002308 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002309 self._autoserv_monitor.attach_to_existing_process(
2310 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002311
showardd1195652009-12-08 22:21:02 +00002312
2313 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002314 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002315 return 'true'
2316 return self._generate_command(
2317 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002318
2319
2320 def _generate_command(self, results_dir):
2321 raise NotImplementedError('Subclasses must override this')
2322
2323
showardd1195652009-12-08 22:21:02 +00002324 @property
2325 def owner_username(self):
2326 return self.queue_entries[0].job.owner
2327
2328
2329 def _working_directory(self):
2330 return self._get_consistent_execution_path(self.queue_entries)
2331
2332
2333 def _paired_with_monitor(self):
2334 return self._autoserv_monitor
2335
2336
showardd3dc1992009-04-22 21:01:40 +00002337 def _job_was_aborted(self):
2338 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002339 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002340 queue_entry.update_from_database()
2341 if was_aborted is None: # first queue entry
2342 was_aborted = bool(queue_entry.aborted)
2343 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00002344 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
2345 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00002346 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00002347 'Inconsistent abort state',
2348 'Queue entries have inconsistent abort state:\n' +
2349 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00002350 # don't crash here, just assume true
2351 return True
2352 return was_aborted
2353
2354
showardd1195652009-12-08 22:21:02 +00002355 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002356 if self._job_was_aborted():
2357 return models.HostQueueEntry.Status.ABORTED
2358
2359 # we'll use a PidfileRunMonitor to read the autoserv exit status
2360 if self._autoserv_monitor.exit_code() == 0:
2361 return models.HostQueueEntry.Status.COMPLETED
2362 return models.HostQueueEntry.Status.FAILED
2363
2364
showardd3dc1992009-04-22 21:01:40 +00002365 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002366 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002367 queue_entry.set_status(status)
2368
2369
2370 def abort(self):
2371 # override AgentTask.abort() to avoid killing the process and ending
2372 # the task. post-job tasks continue when the job is aborted.
2373 pass
2374
2375
mbligh4608b002010-01-05 18:22:35 +00002376 def _pidfile_label(self):
2377 # '.autoserv_execute' -> 'autoserv'
2378 return self._pidfile_name()[1:-len('_execute')]
2379
2380
showard9bb960b2009-11-19 01:02:11 +00002381class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002382 """
2383 Task responsible for
2384 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2385 * copying logs to the results repository
2386 * spawning CleanupTasks for hosts, if necessary
2387 * spawning a FinalReparseTask for the job
2388 """
showardd1195652009-12-08 22:21:02 +00002389 def __init__(self, queue_entries, recover_run_monitor=None):
2390 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002391 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002392 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002393 self._set_ids(queue_entries=queue_entries)
2394
2395
2396 def _generate_command(self, results_dir):
2397 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002398 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002399 return [_autoserv_path , '-p',
2400 '--pidfile-label=%s' % self._pidfile_label(),
2401 '--use-existing-results', '--collect-crashinfo',
2402 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002403
2404
showardd1195652009-12-08 22:21:02 +00002405 @property
2406 def num_processes(self):
2407 return len(self.queue_entries)
2408
2409
2410 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002411 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002412
2413
showardd3dc1992009-04-22 21:01:40 +00002414 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002415 self._check_queue_entry_statuses(
2416 self.queue_entries,
2417 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2418 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002419
showardd3dc1992009-04-22 21:01:40 +00002420 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002421
2422
showardd3dc1992009-04-22 21:01:40 +00002423 def epilog(self):
2424 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002425 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002426 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002427
showard9bb960b2009-11-19 01:02:11 +00002428
2429 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002430 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002431 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002432 models.HostQueueEntry.Status.COMPLETED)
2433 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2434 else:
2435 final_success = False
2436 num_tests_failed = 0
2437
showard9bb960b2009-11-19 01:02:11 +00002438 reboot_after = self._job.reboot_after
2439 do_reboot = (
2440 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002441 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002442 or reboot_after == model_attributes.RebootAfter.ALWAYS
2443 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002444 and final_success and num_tests_failed == 0))
2445
showardd1195652009-12-08 22:21:02 +00002446 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002447 if do_reboot:
2448 # don't pass the queue entry to the CleanupTask. if the cleanup
2449 # fails, the job doesn't care -- it's over.
2450 models.SpecialTask.objects.create(
2451 host=models.Host.objects.get(id=queue_entry.host.id),
2452 task=models.SpecialTask.Task.CLEANUP,
2453 requested_by=self._job.owner_model())
2454 else:
2455 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002456
2457
showard0bbfc212009-04-29 21:06:13 +00002458 def run(self):
showard597bfd32009-05-08 18:22:50 +00002459 autoserv_exit_code = self._autoserv_monitor.exit_code()
2460 # only run if Autoserv exited due to some signal. if we have no exit
2461 # code, assume something bad (and signal-like) happened.
2462 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002463 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002464 else:
2465 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002466
2467
mbligh4608b002010-01-05 18:22:35 +00002468class SelfThrottledPostJobTask(PostJobTask):
2469 """
2470 Special AgentTask subclass that maintains its own global process limit.
2471 """
2472 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002473
2474
mbligh4608b002010-01-05 18:22:35 +00002475 @classmethod
2476 def _increment_running_processes(cls):
2477 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002478
mblighd5c95802008-03-05 00:33:46 +00002479
mbligh4608b002010-01-05 18:22:35 +00002480 @classmethod
2481 def _decrement_running_processes(cls):
2482 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002483
2484
mbligh4608b002010-01-05 18:22:35 +00002485 @classmethod
2486 def _max_processes(cls):
2487 raise NotImplementedError
2488
2489
2490 @classmethod
2491 def _can_run_new_process(cls):
2492 return cls._num_running_processes < cls._max_processes()
2493
2494
2495 def _process_started(self):
2496 return bool(self.monitor)
2497
2498
2499 def tick(self):
2500 # override tick to keep trying to start until the process count goes
2501 # down and we can, at which point we revert to default behavior
2502 if self._process_started():
2503 super(SelfThrottledPostJobTask, self).tick()
2504 else:
2505 self._try_starting_process()
2506
2507
2508 def run(self):
2509 # override run() to not actually run unless we can
2510 self._try_starting_process()
2511
2512
2513 def _try_starting_process(self):
2514 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002515 return
2516
mbligh4608b002010-01-05 18:22:35 +00002517 # actually run the command
2518 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002519 if self._process_started():
2520 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002521
mblighd5c95802008-03-05 00:33:46 +00002522
mbligh4608b002010-01-05 18:22:35 +00002523 def finished(self, success):
2524 super(SelfThrottledPostJobTask, self).finished(success)
2525 if self._process_started():
2526 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002527
showard21baa452008-10-21 00:08:39 +00002528
mbligh4608b002010-01-05 18:22:35 +00002529class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002530 def __init__(self, queue_entries):
2531 super(FinalReparseTask, self).__init__(queue_entries,
2532 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002533 # don't use _set_ids, since we don't want to set the host_ids
2534 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002535
2536
2537 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002538 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002539 results_dir]
2540
2541
2542 @property
2543 def num_processes(self):
2544 return 0 # don't include parser processes in accounting
2545
2546
2547 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002548 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002549
2550
showard97aed502008-11-04 02:01:24 +00002551 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002552 def _max_processes(cls):
2553 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002554
2555
2556 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002557 self._check_queue_entry_statuses(
2558 self.queue_entries,
2559 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002560
showard97aed502008-11-04 02:01:24 +00002561 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002562
2563
2564 def epilog(self):
2565 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002566 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002567
2568
mbligh4608b002010-01-05 18:22:35 +00002569class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002570 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2571
mbligh4608b002010-01-05 18:22:35 +00002572 def __init__(self, queue_entries):
2573 super(ArchiveResultsTask, self).__init__(queue_entries,
2574 log_file_name='.archiving.log')
2575 # don't use _set_ids, since we don't want to set the host_ids
2576 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002577
2578
mbligh4608b002010-01-05 18:22:35 +00002579 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002580 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002581
2582
mbligh4608b002010-01-05 18:22:35 +00002583 def _generate_command(self, results_dir):
2584 return [_autoserv_path , '-p',
2585 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002586 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002587 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2588 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002589
2590
mbligh4608b002010-01-05 18:22:35 +00002591 @classmethod
2592 def _max_processes(cls):
2593 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002594
2595
2596 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002597 self._check_queue_entry_statuses(
2598 self.queue_entries,
2599 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2600
2601 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002602
2603
mbligh4608b002010-01-05 18:22:35 +00002604 def epilog(self):
2605 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002606 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002607 failed_file = os.path.join(self._working_directory(),
2608 self._ARCHIVING_FAILED_FILE)
2609 paired_process = self._paired_with_monitor().get_process()
2610 _drone_manager.write_lines_to_file(
2611 failed_file, ['Archiving failed with exit code %s'
2612 % self.monitor.exit_code()],
2613 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002614 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002615
2616
mbligh36768f02008-02-22 18:28:33 +00002617if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002618 main()