blob: 98e56b4210e63436db455380b8526b915e2fb807 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
80 raise SchedulerError('Drone sets are enabled, but no default is set')
81
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000102 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000103
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
120 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
121 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000122 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
jamesrenc44ae992010-02-19 00:12:54 +0000145 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000146 server.start()
147
jadmanski0afbb632008-06-06 21:10:57 +0000148 try:
jamesrenc44ae992010-02-19 00:12:54 +0000149 initialize()
showardc5afc462009-01-13 00:09:39 +0000150 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000151 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000152
jadmanski0afbb632008-06-06 21:10:57 +0000153 while not _shutdown:
154 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000155 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000156 except:
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.log_stacktrace(
158 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000159
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000161 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000162 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000163 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000164
165
showard136e6dc2009-06-10 19:38:49 +0000166def setup_logging():
167 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
168 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
169 logging_manager.configure_logging(
170 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
171 logfile_name=log_name)
172
173
mbligh36768f02008-02-22 18:28:33 +0000174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
jamesrenc44ae992010-02-19 00:12:54 +0000180def initialize():
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
showard8de37132009-08-31 18:33:08 +0000184 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000185 logging.critical("monitor_db already running, aborting!")
186 sys.exit(1)
187 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000188
showardb1e51872008-10-07 11:08:18 +0000189 if _testing_mode:
190 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000191 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
194 global _db
showard170873e2009-01-07 00:22:26 +0000195 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000196 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000197
showardfa8629c2008-11-04 16:51:23 +0000198 # ensure Django connection is in autocommit
199 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000200 # bypass the readonly connection
201 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000202
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000204 signal.signal(signal.SIGINT, handle_sigint)
205
jamesrenc44ae992010-02-19 00:12:54 +0000206 initialize_globals()
207 scheduler_models.initialize()
208
showardd1ee1dd2009-01-07 21:33:08 +0000209 drones = global_config.global_config.get_config_value(
210 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
211 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000212 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000213 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000214 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
215
showardb18134f2009-03-20 20:52:18 +0000216 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000217
218
jamesrenc44ae992010-02-19 00:12:54 +0000219def initialize_globals():
220 global _drone_manager
221 _drone_manager = drone_manager.instance()
222
223
showarded2afea2009-07-07 20:54:07 +0000224def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
225 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000226 """
227 @returns The autoserv command line as a list of executable + parameters.
228
229 @param machines - string - A machine or comma separated list of machines
230 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000231 @param extra_args - list - Additional arguments to pass to autoserv.
232 @param job - Job object - If supplied, -u owner and -l name parameters
233 will be added.
234 @param queue_entry - A HostQueueEntry object - If supplied and no Job
235 object was supplied, this will be used to lookup the Job object.
236 """
showarda9545c02009-12-18 22:44:26 +0000237 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000238 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000239 if machines:
240 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
jamesren883492a2010-02-12 00:45:18 +0000254class HostScheduler(metahost_scheduler.HostSchedulingUtility):
255 """Handles the logic for choosing when to run jobs and on which hosts.
256
257 This class makes several queries to the database on each tick, building up
258 some auxiliary data structures and using them to determine which hosts are
259 eligible to run which jobs, taking into account all the various factors that
260 affect that.
261
262 In the past this was done with one or two very large, complex database
263 queries. It has proven much simpler and faster to build these auxiliary
264 data structures and perform the logic in Python.
265 """
266 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000267 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
268
269 # load site-specific scheduler selected in global_config
270 site_schedulers_str = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
272 default='')
273 site_schedulers = set(site_schedulers_str.split(','))
274 for scheduler in get_site_metahost_schedulers():
275 if type(scheduler).__name__ in site_schedulers:
276 # always prepend, so site schedulers take precedence
277 self._metahost_schedulers = (
278 [scheduler] + self._metahost_schedulers)
279 logging.info('Metahost schedulers: %s',
280 ', '.join(type(scheduler).__name__ for scheduler
281 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000282
283
showard63a34772008-08-18 19:32:50 +0000284 def _get_ready_hosts(self):
285 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000286 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000287 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
288 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000289 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000290 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000291 "AND NOT afe_hosts.locked "
292 "AND (afe_hosts.status IS NULL "
293 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000294 return dict((host.id, host) for host in hosts)
295
296
297 @staticmethod
298 def _get_sql_id_list(id_list):
299 return ','.join(str(item_id) for item_id in id_list)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000304 if not id_list:
305 return {}
showard63a34772008-08-18 19:32:50 +0000306 query %= cls._get_sql_id_list(id_list)
307 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000308 return cls._process_many2many_dict(rows, flip)
309
310
311 @staticmethod
312 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000313 result = {}
314 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000315 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000316 if flip:
317 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000318 result.setdefault(left_id, set()).add(right_id)
319 return result
320
321
322 @classmethod
323 def _get_job_acl_groups(cls, job_ids):
324 query = """
showardeab66ce2009-12-23 00:03:56 +0000325 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
326 FROM afe_jobs
327 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
328 INNER JOIN afe_acl_groups_users ON
329 afe_acl_groups_users.user_id = afe_users.id
330 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000331 """
332 return cls._get_many2many_dict(query, job_ids)
333
334
335 @classmethod
336 def _get_job_ineligible_hosts(cls, job_ids):
337 query = """
338 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000339 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000340 WHERE job_id IN (%s)
341 """
342 return cls._get_many2many_dict(query, job_ids)
343
344
345 @classmethod
showard989f25d2008-10-01 11:38:11 +0000346 def _get_job_dependencies(cls, job_ids):
347 query = """
348 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000349 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000350 WHERE job_id IN (%s)
351 """
352 return cls._get_many2many_dict(query, job_ids)
353
354
355 @classmethod
showard63a34772008-08-18 19:32:50 +0000356 def _get_host_acls(cls, host_ids):
357 query = """
showardd9ac4452009-02-07 02:04:37 +0000358 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000359 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000360 WHERE host_id IN (%s)
361 """
362 return cls._get_many2many_dict(query, host_ids)
363
364
365 @classmethod
366 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000367 if not host_ids:
368 return {}, {}
showard63a34772008-08-18 19:32:50 +0000369 query = """
370 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000371 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000372 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000373 """ % cls._get_sql_id_list(host_ids)
374 rows = _db.execute(query)
375 labels_to_hosts = cls._process_many2many_dict(rows)
376 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
377 return labels_to_hosts, hosts_to_labels
378
379
380 @classmethod
381 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000382 return dict((label.id, label) for label
383 in scheduler_models.Label.fetch())
384
385
386 def recovery_on_startup(self):
387 for metahost_scheduler in self._metahost_schedulers:
388 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000389
390
391 def refresh(self, pending_queue_entries):
392 self._hosts_available = self._get_ready_hosts()
393
394 relevant_jobs = [queue_entry.job_id
395 for queue_entry in pending_queue_entries]
396 self._job_acls = self._get_job_acl_groups(relevant_jobs)
397 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000398 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000399
400 host_ids = self._hosts_available.keys()
401 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000402 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
403
404 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000405
jamesrene21bf412010-02-26 02:30:07 +0000406
407 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000408 for metahost_scheduler in self._metahost_schedulers:
409 metahost_scheduler.tick()
410
showard63a34772008-08-18 19:32:50 +0000411
jamesren883492a2010-02-12 00:45:18 +0000412 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000413 return set(self._label_hosts.get(label_id, ()))
414
415
416 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000417 self._label_hosts[label_id].remove(host_id)
418
419
420 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000421 return self._hosts_available.pop(host_id)
422
423
424 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000425 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
426
427
showard63a34772008-08-18 19:32:50 +0000428 def _is_acl_accessible(self, host_id, queue_entry):
429 job_acls = self._job_acls.get(queue_entry.job_id, set())
430 host_acls = self._host_acls.get(host_id, set())
431 return len(host_acls.intersection(job_acls)) > 0
432
433
showard989f25d2008-10-01 11:38:11 +0000434 def _check_job_dependencies(self, job_dependencies, host_labels):
435 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000436 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000437
438
439 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
440 queue_entry):
showardade14e22009-01-26 22:38:32 +0000441 if not queue_entry.meta_host:
442 # bypass only_if_needed labels when a specific host is selected
443 return True
444
showard989f25d2008-10-01 11:38:11 +0000445 for label_id in host_labels:
446 label = self._labels[label_id]
447 if not label.only_if_needed:
448 # we don't care about non-only_if_needed labels
449 continue
450 if queue_entry.meta_host == label_id:
451 # if the label was requested in a metahost it's OK
452 continue
453 if label_id not in job_dependencies:
454 return False
455 return True
456
457
showard89f84db2009-03-12 20:39:13 +0000458 def _check_atomic_group_labels(self, host_labels, queue_entry):
459 """
460 Determine if the given HostQueueEntry's atomic group settings are okay
461 to schedule on a host with the given labels.
462
showard6157c632009-07-06 20:19:31 +0000463 @param host_labels: A list of label ids that the host has.
464 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000465
466 @returns True if atomic group settings are okay, False otherwise.
467 """
showard6157c632009-07-06 20:19:31 +0000468 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000469 queue_entry.atomic_group_id)
470
471
showard6157c632009-07-06 20:19:31 +0000472 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000473 """
474 Return the atomic group label id for a host with the given set of
475 labels if any, or None otherwise. Raises an exception if more than
476 one atomic group are found in the set of labels.
477
showard6157c632009-07-06 20:19:31 +0000478 @param host_labels: A list of label ids that the host has.
479 @param queue_entry: The HostQueueEntry we're testing. Only used for
480 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000481
482 @returns The id of the atomic group found on a label in host_labels
483 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000484 """
showard6157c632009-07-06 20:19:31 +0000485 atomic_labels = [self._labels[label_id] for label_id in host_labels
486 if self._labels[label_id].atomic_group_id is not None]
487 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000488 if not atomic_ids:
489 return None
490 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000491 logging.error('More than one Atomic Group on HQE "%s" via: %r',
492 queue_entry, atomic_labels)
493 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000494
495
496 def _get_atomic_group_labels(self, atomic_group_id):
497 """
498 Lookup the label ids that an atomic_group is associated with.
499
500 @param atomic_group_id - The id of the AtomicGroup to look up.
501
502 @returns A generator yeilding Label ids for this atomic group.
503 """
504 return (id for id, label in self._labels.iteritems()
505 if label.atomic_group_id == atomic_group_id
506 and not label.invalid)
507
508
showard54c1ea92009-05-20 00:32:58 +0000509 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000510 """
511 @param group_hosts - A sequence of Host ids to test for usability
512 and eligibility against the Job associated with queue_entry.
513 @param queue_entry - The HostQueueEntry that these hosts are being
514 tested for eligibility against.
515
516 @returns A subset of group_hosts Host ids that are eligible for the
517 supplied queue_entry.
518 """
519 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000520 if self.is_host_usable(host_id)
521 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000522
523
jamesren883492a2010-02-12 00:45:18 +0000524 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000525 if self._is_host_invalid(host_id):
526 # if an invalid host is scheduled for a job, it's a one-time host
527 # and it therefore bypasses eligibility checks. note this can only
528 # happen for non-metahosts, because invalid hosts have their label
529 # relationships cleared.
530 return True
531
showard989f25d2008-10-01 11:38:11 +0000532 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
533 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000534
showard89f84db2009-03-12 20:39:13 +0000535 return (self._is_acl_accessible(host_id, queue_entry) and
536 self._check_job_dependencies(job_dependencies, host_labels) and
537 self._check_only_if_needed_labels(
538 job_dependencies, host_labels, queue_entry) and
539 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000540
541
showard2924b0a2009-06-18 23:16:15 +0000542 def _is_host_invalid(self, host_id):
543 host_object = self._hosts_available.get(host_id, None)
544 return host_object and host_object.invalid
545
546
showard63a34772008-08-18 19:32:50 +0000547 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000548 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000549 return None
550 return self._hosts_available.pop(queue_entry.host_id, None)
551
552
jamesren883492a2010-02-12 00:45:18 +0000553 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000554 if host_id not in self._hosts_available:
555 # host was already used during this scheduling cycle
556 return False
557 if self._hosts_available[host_id].invalid:
558 # Invalid hosts cannot be used for metahosts. They're included in
559 # the original query because they can be used by non-metahosts.
560 return False
561 return True
562
563
jamesren883492a2010-02-12 00:45:18 +0000564 def schedule_entry(self, queue_entry):
565 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000566 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000567
568 for scheduler in self._metahost_schedulers:
569 if scheduler.can_schedule_metahost(queue_entry):
570 scheduler.schedule_metahost(queue_entry, self)
571 return None
572
573 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000574
575
showard89f84db2009-03-12 20:39:13 +0000576 def find_eligible_atomic_group(self, queue_entry):
577 """
578 Given an atomic group host queue entry, locate an appropriate group
579 of hosts for the associated job to run on.
580
581 The caller is responsible for creating new HQEs for the additional
582 hosts returned in order to run the actual job on them.
583
584 @returns A list of Host instances in a ready state to satisfy this
585 atomic group scheduling. Hosts will all belong to the same
586 atomic group label as specified by the queue_entry.
587 An empty list will be returned if no suitable atomic
588 group could be found.
589
590 TODO(gps): what is responsible for kicking off any attempted repairs on
591 a group of hosts? not this function, but something needs to. We do
592 not communicate that reason for returning [] outside of here...
593 For now, we'll just be unschedulable if enough hosts within one group
594 enter Repair Failed state.
595 """
596 assert queue_entry.atomic_group_id is not None
597 job = queue_entry.job
598 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000599 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000600 if job.synch_count > atomic_group.max_number_of_machines:
601 # Such a Job and HostQueueEntry should never be possible to
602 # create using the frontend. Regardless, we can't process it.
603 # Abort it immediately and log an error on the scheduler.
604 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000605 logging.error(
606 'Error: job %d synch_count=%d > requested atomic_group %d '
607 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
608 job.id, job.synch_count, atomic_group.id,
609 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000610 return []
jamesren883492a2010-02-12 00:45:18 +0000611 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
612 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000613
614 # Look in each label associated with atomic_group until we find one with
615 # enough hosts to satisfy the job.
616 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000617 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000618 if queue_entry.meta_host is not None:
619 # If we have a metahost label, only allow its hosts.
620 group_hosts.intersection_update(hosts_in_label)
621 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000622 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000623 group_hosts, queue_entry)
624
625 # Job.synch_count is treated as "minimum synch count" when
626 # scheduling for an atomic group of hosts. The atomic group
627 # number of machines is the maximum to pick out of a single
628 # atomic group label for scheduling at one time.
629 min_hosts = job.synch_count
630 max_hosts = atomic_group.max_number_of_machines
631
showard54c1ea92009-05-20 00:32:58 +0000632 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000633 # Not enough eligible hosts in this atomic group label.
634 continue
635
showard54c1ea92009-05-20 00:32:58 +0000636 eligible_hosts_in_group = [self._hosts_available[id]
637 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000638 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000639 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000640
showard89f84db2009-03-12 20:39:13 +0000641 # Limit ourselves to scheduling the atomic group size.
642 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000643 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000644
645 # Remove the selected hosts from our cached internal state
646 # of available hosts in order to return the Host objects.
647 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000648 for host in eligible_hosts_in_group:
649 hosts_in_label.discard(host.id)
650 self._hosts_available.pop(host.id)
651 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000652 return host_list
653
654 return []
655
656
showard170873e2009-01-07 00:22:26 +0000657class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000658 def __init__(self):
659 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000660 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000661 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000662 user_cleanup_time = scheduler_config.config.clean_interval
663 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
664 _db, user_cleanup_time)
665 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000666 self._host_agents = {}
667 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000668 self._tick_count = 0
669 self._last_garbage_stats_time = time.time()
670 self._seconds_between_garbage_stats = 60 * (
671 global_config.global_config.get_config_value(
672 scheduler_config.CONFIG_SECTION,
673 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000674
mbligh36768f02008-02-22 18:28:33 +0000675
showard915958d2009-04-22 21:00:58 +0000676 def initialize(self, recover_hosts=True):
677 self._periodic_cleanup.initialize()
678 self._24hr_upkeep.initialize()
679
jadmanski0afbb632008-06-06 21:10:57 +0000680 # always recover processes
681 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000682
jadmanski0afbb632008-06-06 21:10:57 +0000683 if recover_hosts:
684 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000685
jamesrenc44ae992010-02-19 00:12:54 +0000686 self._host_scheduler.recovery_on_startup()
687
mbligh36768f02008-02-22 18:28:33 +0000688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000690 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000691 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000692 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000693 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000694 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000695 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000696 self._schedule_running_host_queue_entries()
697 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000698 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000699 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000700 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000701 _drone_manager.execute_actions()
702 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000703 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000704 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000705
showard97aed502008-11-04 02:01:24 +0000706
mblighf3294cc2009-04-08 21:17:38 +0000707 def _run_cleanup(self):
708 self._periodic_cleanup.run_cleanup_maybe()
709 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000710
mbligh36768f02008-02-22 18:28:33 +0000711
showardf13a9e22009-12-18 22:54:09 +0000712 def _garbage_collection(self):
713 threshold_time = time.time() - self._seconds_between_garbage_stats
714 if threshold_time < self._last_garbage_stats_time:
715 # Don't generate these reports very often.
716 return
717
718 self._last_garbage_stats_time = time.time()
719 # Force a full level 0 collection (because we can, it doesn't hurt
720 # at this interval).
721 gc.collect()
722 logging.info('Logging garbage collector stats on tick %d.',
723 self._tick_count)
724 gc_stats._log_garbage_collector_stats()
725
726
showard170873e2009-01-07 00:22:26 +0000727 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
728 for object_id in object_ids:
729 agent_dict.setdefault(object_id, set()).add(agent)
730
731
732 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
733 for object_id in object_ids:
734 assert object_id in agent_dict
735 agent_dict[object_id].remove(agent)
736
737
showardd1195652009-12-08 22:21:02 +0000738 def add_agent_task(self, agent_task):
739 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000740 self._agents.append(agent)
741 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000742 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
743 self._register_agent_for_ids(self._queue_entry_agents,
744 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000745
showard170873e2009-01-07 00:22:26 +0000746
747 def get_agents_for_entry(self, queue_entry):
748 """
749 Find agents corresponding to the specified queue_entry.
750 """
showardd3dc1992009-04-22 21:01:40 +0000751 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000752
753
754 def host_has_agent(self, host):
755 """
756 Determine if there is currently an Agent present using this host.
757 """
758 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000759
760
jadmanski0afbb632008-06-06 21:10:57 +0000761 def remove_agent(self, agent):
762 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000763 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
764 agent)
765 self._unregister_agent_for_ids(self._queue_entry_agents,
766 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000767
768
showard8cc058f2009-09-08 16:26:33 +0000769 def _host_has_scheduled_special_task(self, host):
770 return bool(models.SpecialTask.objects.filter(host__id=host.id,
771 is_active=False,
772 is_complete=False))
773
774
jadmanski0afbb632008-06-06 21:10:57 +0000775 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000776 agent_tasks = self._create_recovery_agent_tasks()
777 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000778 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000779 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000780 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000781 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000782 self._reverify_remaining_hosts()
783 # reinitialize drones after killing orphaned processes, since they can
784 # leave around files when they die
785 _drone_manager.execute_actions()
786 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000787
showard170873e2009-01-07 00:22:26 +0000788
showardd1195652009-12-08 22:21:02 +0000789 def _create_recovery_agent_tasks(self):
790 return (self._get_queue_entry_agent_tasks()
791 + self._get_special_task_agent_tasks(is_active=True))
792
793
794 def _get_queue_entry_agent_tasks(self):
795 # host queue entry statuses handled directly by AgentTasks (Verifying is
796 # handled through SpecialTasks, so is not listed here)
797 statuses = (models.HostQueueEntry.Status.STARTING,
798 models.HostQueueEntry.Status.RUNNING,
799 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000800 models.HostQueueEntry.Status.PARSING,
801 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000802 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000803 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000804 where='status IN (%s)' % status_list)
805
806 agent_tasks = []
807 used_queue_entries = set()
808 for entry in queue_entries:
809 if self.get_agents_for_entry(entry):
810 # already being handled
811 continue
812 if entry in used_queue_entries:
813 # already picked up by a synchronous job
814 continue
815 agent_task = self._get_agent_task_for_queue_entry(entry)
816 agent_tasks.append(agent_task)
817 used_queue_entries.update(agent_task.queue_entries)
818 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000819
820
showardd1195652009-12-08 22:21:02 +0000821 def _get_special_task_agent_tasks(self, is_active=False):
822 special_tasks = models.SpecialTask.objects.filter(
823 is_active=is_active, is_complete=False)
824 return [self._get_agent_task_for_special_task(task)
825 for task in special_tasks]
826
827
828 def _get_agent_task_for_queue_entry(self, queue_entry):
829 """
830 Construct an AgentTask instance for the given active HostQueueEntry,
831 if one can currently run it.
832 @param queue_entry: a HostQueueEntry
833 @returns an AgentTask to run the queue entry
834 """
835 task_entries = queue_entry.job.get_group_entries(queue_entry)
836 self._check_for_duplicate_host_entries(task_entries)
837
838 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
839 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000840 if queue_entry.is_hostless():
841 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000842 return QueueTask(queue_entries=task_entries)
843 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
844 return GatherLogsTask(queue_entries=task_entries)
845 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
846 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000847 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
848 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000849
850 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000851 'invalid status %s: %s'
852 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000853
854
855 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000856 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
857 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000858 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000859 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000860 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000861 if using_host:
showardd1195652009-12-08 22:21:02 +0000862 self._assert_host_has_no_agent(task_entry)
863
864
865 def _assert_host_has_no_agent(self, entry):
866 """
867 @param entry: a HostQueueEntry or a SpecialTask
868 """
869 if self.host_has_agent(entry.host):
870 agent = tuple(self._host_agents.get(entry.host.id))[0]
871 raise SchedulerError(
872 'While scheduling %s, host %s already has a host agent %s'
873 % (entry, entry.host, agent.task))
874
875
876 def _get_agent_task_for_special_task(self, special_task):
877 """
878 Construct an AgentTask class to run the given SpecialTask and add it
879 to this dispatcher.
880 @param special_task: a models.SpecialTask instance
881 @returns an AgentTask to run this SpecialTask
882 """
883 self._assert_host_has_no_agent(special_task)
884
885 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
886 for agent_task_class in special_agent_task_classes:
887 if agent_task_class.TASK_TYPE == special_task.task:
888 return agent_task_class(task=special_task)
889
890 raise SchedulerError('No AgentTask class for task', str(special_task))
891
892
893 def _register_pidfiles(self, agent_tasks):
894 for agent_task in agent_tasks:
895 agent_task.register_necessary_pidfiles()
896
897
898 def _recover_tasks(self, agent_tasks):
899 orphans = _drone_manager.get_orphaned_autoserv_processes()
900
901 for agent_task in agent_tasks:
902 agent_task.recover()
903 if agent_task.monitor and agent_task.monitor.has_process():
904 orphans.discard(agent_task.monitor.get_process())
905 self.add_agent_task(agent_task)
906
907 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000908
909
showard8cc058f2009-09-08 16:26:33 +0000910 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000911 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
912 % status):
showard0db3d432009-10-12 20:29:15 +0000913 if entry.status == status and not self.get_agents_for_entry(entry):
914 # The status can change during iteration, e.g., if job.run()
915 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000916 yield entry
917
918
showard6878e8b2009-07-20 22:37:45 +0000919 def _check_for_remaining_orphan_processes(self, orphans):
920 if not orphans:
921 return
922 subject = 'Unrecovered orphan autoserv processes remain'
923 message = '\n'.join(str(process) for process in orphans)
924 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000925
926 die_on_orphans = global_config.global_config.get_config_value(
927 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
928
929 if die_on_orphans:
930 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000931
showard170873e2009-01-07 00:22:26 +0000932
showard8cc058f2009-09-08 16:26:33 +0000933 def _recover_pending_entries(self):
934 for entry in self._get_unassigned_entries(
935 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000936 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000937 entry.on_pending()
938
939
showardb8900452009-10-12 20:31:01 +0000940 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000941 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000942 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
943 unrecovered_hqes = []
944 for queue_entry in queue_entries:
945 special_tasks = models.SpecialTask.objects.filter(
946 task__in=(models.SpecialTask.Task.CLEANUP,
947 models.SpecialTask.Task.VERIFY),
948 queue_entry__id=queue_entry.id,
949 is_complete=False)
950 if special_tasks.count() == 0:
951 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000952
showardb8900452009-10-12 20:31:01 +0000953 if unrecovered_hqes:
954 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000955 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000956 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000957 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000958
959
showard65db3932009-10-28 19:54:35 +0000960 def _get_prioritized_special_tasks(self):
961 """
962 Returns all queued SpecialTasks prioritized for repair first, then
963 cleanup, then verify.
964 """
965 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
966 is_complete=False,
967 host__locked=False)
968 # exclude hosts with active queue entries unless the SpecialTask is for
969 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000970 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000971 queued_tasks, 'afe_host_queue_entries', 'host_id',
972 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000973 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000974 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000975 where=['(afe_host_queue_entries.id IS NULL OR '
976 'afe_host_queue_entries.id = '
977 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000978
showard65db3932009-10-28 19:54:35 +0000979 # reorder tasks by priority
980 task_priority_order = [models.SpecialTask.Task.REPAIR,
981 models.SpecialTask.Task.CLEANUP,
982 models.SpecialTask.Task.VERIFY]
983 def task_priority_key(task):
984 return task_priority_order.index(task.task)
985 return sorted(queued_tasks, key=task_priority_key)
986
987
showard65db3932009-10-28 19:54:35 +0000988 def _schedule_special_tasks(self):
989 """
990 Execute queued SpecialTasks that are ready to run on idle hosts.
991 """
992 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000993 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000994 continue
showardd1195652009-12-08 22:21:02 +0000995 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000996
997
showard170873e2009-01-07 00:22:26 +0000998 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000999 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001000 # should never happen
showarded2afea2009-07-07 20:54:07 +00001001 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001002 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001003 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001004 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001005 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001006
1007
jadmanski0afbb632008-06-06 21:10:57 +00001008 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001009 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001010 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001011 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001012 if self.host_has_agent(host):
1013 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001014 continue
showard8cc058f2009-09-08 16:26:33 +00001015 if self._host_has_scheduled_special_task(host):
1016 # host will have a special task scheduled on the next cycle
1017 continue
showard170873e2009-01-07 00:22:26 +00001018 if print_message:
showardb18134f2009-03-20 20:52:18 +00001019 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001020 models.SpecialTask.objects.create(
1021 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001022 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001023
1024
jadmanski0afbb632008-06-06 21:10:57 +00001025 def _recover_hosts(self):
1026 # recover "Repair Failed" hosts
1027 message = 'Reverifying dead host %s'
1028 self._reverify_hosts_where("status = 'Repair Failed'",
1029 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001030
1031
showard04c82c52008-05-29 19:38:12 +00001032
showardb95b1bd2008-08-15 18:11:04 +00001033 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001034 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001035 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001036 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001037 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001038 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001039
1040
showard89f84db2009-03-12 20:39:13 +00001041 def _refresh_pending_queue_entries(self):
1042 """
1043 Lookup the pending HostQueueEntries and call our HostScheduler
1044 refresh() method given that list. Return the list.
1045
1046 @returns A list of pending HostQueueEntries sorted in priority order.
1047 """
showard63a34772008-08-18 19:32:50 +00001048 queue_entries = self._get_pending_queue_entries()
1049 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001050 return []
showardb95b1bd2008-08-15 18:11:04 +00001051
showard63a34772008-08-18 19:32:50 +00001052 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001053
showard89f84db2009-03-12 20:39:13 +00001054 return queue_entries
1055
1056
1057 def _schedule_atomic_group(self, queue_entry):
1058 """
1059 Schedule the given queue_entry on an atomic group of hosts.
1060
1061 Returns immediately if there are insufficient available hosts.
1062
1063 Creates new HostQueueEntries based off of queue_entry for the
1064 scheduled hosts and starts them all running.
1065 """
1066 # This is a virtual host queue entry representing an entire
1067 # atomic group, find a group and schedule their hosts.
1068 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1069 queue_entry)
1070 if not group_hosts:
1071 return
showardcbe6f942009-06-17 19:33:49 +00001072
1073 logging.info('Expanding atomic group entry %s with hosts %s',
1074 queue_entry,
1075 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001076
showard89f84db2009-03-12 20:39:13 +00001077 for assigned_host in group_hosts[1:]:
1078 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001079 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001080 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001081 new_hqe.set_host(assigned_host)
1082 self._run_queue_entry(new_hqe)
1083
1084 # The first assigned host uses the original HostQueueEntry
1085 queue_entry.set_host(group_hosts[0])
1086 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001087
1088
showarda9545c02009-12-18 22:44:26 +00001089 def _schedule_hostless_job(self, queue_entry):
1090 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001091 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001092
1093
showard89f84db2009-03-12 20:39:13 +00001094 def _schedule_new_jobs(self):
1095 queue_entries = self._refresh_pending_queue_entries()
1096 if not queue_entries:
1097 return
1098
showard63a34772008-08-18 19:32:50 +00001099 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001100 is_unassigned_atomic_group = (
1101 queue_entry.atomic_group_id is not None
1102 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001103
1104 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001105 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001106 elif is_unassigned_atomic_group:
1107 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001108 else:
jamesren883492a2010-02-12 00:45:18 +00001109 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001110 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001111 assert assigned_host.id == queue_entry.host_id
1112 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001113
1114
showard8cc058f2009-09-08 16:26:33 +00001115 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001116 for agent_task in self._get_queue_entry_agent_tasks():
1117 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001118
1119
1120 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001121 for entry in scheduler_models.HostQueueEntry.fetch(
1122 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001123 task = entry.job.schedule_delayed_callback_task(entry)
1124 if task:
showardd1195652009-12-08 22:21:02 +00001125 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001126
1127
jamesren883492a2010-02-12 00:45:18 +00001128 def _run_queue_entry(self, queue_entry):
1129 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001130
1131
jadmanski0afbb632008-06-06 21:10:57 +00001132 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +00001133 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +00001134 for entry in scheduler_models.HostQueueEntry.fetch(
1135 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001136 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001137 for agent in self.get_agents_for_entry(entry):
1138 agent.abort()
1139 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +00001140 jobs_to_stop.add(entry.job)
1141 for job in jobs_to_stop:
1142 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +00001143
1144
showard324bf812009-01-20 23:23:38 +00001145 def _can_start_agent(self, agent, num_started_this_cycle,
1146 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001147 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001148 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001149 return True
1150 # don't allow any nonzero-process agents to run after we've reached a
1151 # limit (this avoids starvation of many-process agents)
1152 if have_reached_limit:
1153 return False
1154 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001155 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001156 agent.task.owner_username,
1157 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001158 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001159 return False
1160 # if a single agent exceeds the per-cycle throttling, still allow it to
1161 # run when it's the first agent in the cycle
1162 if num_started_this_cycle == 0:
1163 return True
1164 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001165 if (num_started_this_cycle + agent.task.num_processes >
1166 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001167 return False
1168 return True
1169
1170
jadmanski0afbb632008-06-06 21:10:57 +00001171 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001172 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001173 have_reached_limit = False
1174 # iterate over copy, so we can remove agents during iteration
1175 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001176 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001177 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001178 have_reached_limit):
1179 have_reached_limit = True
1180 continue
showardd1195652009-12-08 22:21:02 +00001181 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001182 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001183 if agent.is_done():
1184 logging.info("agent finished")
1185 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001186 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001187 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001188
1189
showard29f7cd22009-04-29 21:16:24 +00001190 def _process_recurring_runs(self):
1191 recurring_runs = models.RecurringRun.objects.filter(
1192 start_date__lte=datetime.datetime.now())
1193 for rrun in recurring_runs:
1194 # Create job from template
1195 job = rrun.job
1196 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001197 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001198
1199 host_objects = info['hosts']
1200 one_time_hosts = info['one_time_hosts']
1201 metahost_objects = info['meta_hosts']
1202 dependencies = info['dependencies']
1203 atomic_group = info['atomic_group']
1204
1205 for host in one_time_hosts or []:
1206 this_host = models.Host.create_one_time_host(host.hostname)
1207 host_objects.append(this_host)
1208
1209 try:
1210 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001211 options=options,
showard29f7cd22009-04-29 21:16:24 +00001212 host_objects=host_objects,
1213 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001214 atomic_group=atomic_group)
1215
1216 except Exception, ex:
1217 logging.exception(ex)
1218 #TODO send email
1219
1220 if rrun.loop_count == 1:
1221 rrun.delete()
1222 else:
1223 if rrun.loop_count != 0: # if not infinite loop
1224 # calculate new start_date
1225 difference = datetime.timedelta(seconds=rrun.loop_period)
1226 rrun.start_date = rrun.start_date + difference
1227 rrun.loop_count -= 1
1228 rrun.save()
1229
1230
showard170873e2009-01-07 00:22:26 +00001231class PidfileRunMonitor(object):
1232 """
1233 Client must call either run() to start a new process or
1234 attach_to_existing_process().
1235 """
mbligh36768f02008-02-22 18:28:33 +00001236
showard170873e2009-01-07 00:22:26 +00001237 class _PidfileException(Exception):
1238 """
1239 Raised when there's some unexpected behavior with the pid file, but only
1240 used internally (never allowed to escape this class).
1241 """
mbligh36768f02008-02-22 18:28:33 +00001242
1243
showard170873e2009-01-07 00:22:26 +00001244 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001245 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001246 self._start_time = None
1247 self.pidfile_id = None
1248 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001249
1250
showard170873e2009-01-07 00:22:26 +00001251 def _add_nice_command(self, command, nice_level):
1252 if not nice_level:
1253 return command
1254 return ['nice', '-n', str(nice_level)] + command
1255
1256
1257 def _set_start_time(self):
1258 self._start_time = time.time()
1259
1260
showard418785b2009-11-23 20:19:59 +00001261 def run(self, command, working_directory, num_processes, nice_level=None,
1262 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001263 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001264 assert command is not None
1265 if nice_level is not None:
1266 command = ['nice', '-n', str(nice_level)] + command
1267 self._set_start_time()
1268 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001269 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001270 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001271 paired_with_pidfile=paired_with_pidfile, username=username,
1272 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001273
1274
showarded2afea2009-07-07 20:54:07 +00001275 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001276 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001277 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001278 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001279 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001280 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001281 if num_processes is not None:
1282 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001283
1284
jadmanski0afbb632008-06-06 21:10:57 +00001285 def kill(self):
showard170873e2009-01-07 00:22:26 +00001286 if self.has_process():
1287 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001288
mbligh36768f02008-02-22 18:28:33 +00001289
showard170873e2009-01-07 00:22:26 +00001290 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001291 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001292 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001293
1294
showard170873e2009-01-07 00:22:26 +00001295 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001296 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001297 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001298 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001299
1300
showard170873e2009-01-07 00:22:26 +00001301 def _read_pidfile(self, use_second_read=False):
1302 assert self.pidfile_id is not None, (
1303 'You must call run() or attach_to_existing_process()')
1304 contents = _drone_manager.get_pidfile_contents(
1305 self.pidfile_id, use_second_read=use_second_read)
1306 if contents.is_invalid():
1307 self._state = drone_manager.PidfileContents()
1308 raise self._PidfileException(contents)
1309 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001310
1311
showard21baa452008-10-21 00:08:39 +00001312 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001313 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1314 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001315 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001316 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001317
1318
1319 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001320 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001321 return
mblighbb421852008-03-11 22:36:16 +00001322
showard21baa452008-10-21 00:08:39 +00001323 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001324
showard170873e2009-01-07 00:22:26 +00001325 if self._state.process is None:
1326 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001327 return
mbligh90a549d2008-03-25 23:52:34 +00001328
showard21baa452008-10-21 00:08:39 +00001329 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001330 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001331 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001332 return
mbligh90a549d2008-03-25 23:52:34 +00001333
showard170873e2009-01-07 00:22:26 +00001334 # pid but no running process - maybe process *just* exited
1335 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001336 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001337 # autoserv exited without writing an exit code
1338 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001339 self._handle_pidfile_error(
1340 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001341
showard21baa452008-10-21 00:08:39 +00001342
1343 def _get_pidfile_info(self):
1344 """\
1345 After completion, self._state will contain:
1346 pid=None, exit_status=None if autoserv has not yet run
1347 pid!=None, exit_status=None if autoserv is running
1348 pid!=None, exit_status!=None if autoserv has completed
1349 """
1350 try:
1351 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001352 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001353 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001354
1355
showard170873e2009-01-07 00:22:26 +00001356 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001357 """\
1358 Called when no pidfile is found or no pid is in the pidfile.
1359 """
showard170873e2009-01-07 00:22:26 +00001360 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001361 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001362 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001363 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001364 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001365
1366
showard35162b02009-03-03 02:17:30 +00001367 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001368 """\
1369 Called when autoserv has exited without writing an exit status,
1370 or we've timed out waiting for autoserv to write a pid to the
1371 pidfile. In either case, we just return failure and the caller
1372 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001373
showard170873e2009-01-07 00:22:26 +00001374 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001375 """
1376 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001377 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001378 self._state.exit_status = 1
1379 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001380
1381
jadmanski0afbb632008-06-06 21:10:57 +00001382 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001383 self._get_pidfile_info()
1384 return self._state.exit_status
1385
1386
1387 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001388 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001389 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001390 if self._state.num_tests_failed is None:
1391 return -1
showard21baa452008-10-21 00:08:39 +00001392 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001393
1394
showardcdaeae82009-08-31 18:32:48 +00001395 def try_copy_results_on_drone(self, **kwargs):
1396 if self.has_process():
1397 # copy results logs into the normal place for job results
1398 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1399
1400
1401 def try_copy_to_results_repository(self, source, **kwargs):
1402 if self.has_process():
1403 _drone_manager.copy_to_results_repository(self.get_process(),
1404 source, **kwargs)
1405
1406
mbligh36768f02008-02-22 18:28:33 +00001407class Agent(object):
showard77182562009-06-10 00:16:05 +00001408 """
showard8cc058f2009-09-08 16:26:33 +00001409 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001410
1411 The following methods are required on all task objects:
1412 poll() - Called periodically to let the task check its status and
1413 update its internal state. If the task succeeded.
1414 is_done() - Returns True if the task is finished.
1415 abort() - Called when an abort has been requested. The task must
1416 set its aborted attribute to True if it actually aborted.
1417
1418 The following attributes are required on all task objects:
1419 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001420 success - bool, True if this task succeeded.
1421 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1422 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001423 """
1424
1425
showard418785b2009-11-23 20:19:59 +00001426 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001427 """
showard8cc058f2009-09-08 16:26:33 +00001428 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001429 """
showard8cc058f2009-09-08 16:26:33 +00001430 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001431
showard77182562009-06-10 00:16:05 +00001432 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001433 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001434
showard8cc058f2009-09-08 16:26:33 +00001435 self.queue_entry_ids = task.queue_entry_ids
1436 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001437
showard8cc058f2009-09-08 16:26:33 +00001438 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001439 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001440
1441
jadmanski0afbb632008-06-06 21:10:57 +00001442 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001443 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001444 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001445 self.task.poll()
1446 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001447 self.finished = True
showardec113162008-05-08 00:52:49 +00001448
1449
jadmanski0afbb632008-06-06 21:10:57 +00001450 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001451 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001452
1453
showardd3dc1992009-04-22 21:01:40 +00001454 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001455 if self.task:
1456 self.task.abort()
1457 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001458 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001459 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001460
showardd3dc1992009-04-22 21:01:40 +00001461
mbligh36768f02008-02-22 18:28:33 +00001462class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001463 class _NullMonitor(object):
1464 pidfile_id = None
1465
1466 def has_process(self):
1467 return True
1468
1469
1470 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001471 """
showardd1195652009-12-08 22:21:02 +00001472 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001473 """
jadmanski0afbb632008-06-06 21:10:57 +00001474 self.done = False
showardd1195652009-12-08 22:21:02 +00001475 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001476 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001477 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001478 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001479 self.queue_entry_ids = []
1480 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001481 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001482
1483
1484 def _set_ids(self, host=None, queue_entries=None):
1485 if queue_entries and queue_entries != [None]:
1486 self.host_ids = [entry.host.id for entry in queue_entries]
1487 self.queue_entry_ids = [entry.id for entry in queue_entries]
1488 else:
1489 assert host
1490 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001491
1492
jadmanski0afbb632008-06-06 21:10:57 +00001493 def poll(self):
showard08a36412009-05-05 01:01:13 +00001494 if not self.started:
1495 self.start()
showardd1195652009-12-08 22:21:02 +00001496 if not self.done:
1497 self.tick()
showard08a36412009-05-05 01:01:13 +00001498
1499
1500 def tick(self):
showardd1195652009-12-08 22:21:02 +00001501 assert self.monitor
1502 exit_code = self.monitor.exit_code()
1503 if exit_code is None:
1504 return
mbligh36768f02008-02-22 18:28:33 +00001505
showardd1195652009-12-08 22:21:02 +00001506 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001507 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001508
1509
jadmanski0afbb632008-06-06 21:10:57 +00001510 def is_done(self):
1511 return self.done
mbligh36768f02008-02-22 18:28:33 +00001512
1513
jadmanski0afbb632008-06-06 21:10:57 +00001514 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001515 if self.done:
showardd1195652009-12-08 22:21:02 +00001516 assert self.started
showard08a36412009-05-05 01:01:13 +00001517 return
showardd1195652009-12-08 22:21:02 +00001518 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001519 self.done = True
1520 self.success = success
1521 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001522
1523
jadmanski0afbb632008-06-06 21:10:57 +00001524 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001525 """
1526 To be overridden.
1527 """
showarded2afea2009-07-07 20:54:07 +00001528 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001529 self.register_necessary_pidfiles()
1530
1531
1532 def _log_file(self):
1533 if not self._log_file_name:
1534 return None
1535 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001536
mbligh36768f02008-02-22 18:28:33 +00001537
jadmanski0afbb632008-06-06 21:10:57 +00001538 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001539 log_file = self._log_file()
1540 if self.monitor and log_file:
1541 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001542
1543
jadmanski0afbb632008-06-06 21:10:57 +00001544 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001545 """
1546 To be overridden.
1547 """
jadmanski0afbb632008-06-06 21:10:57 +00001548 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001549 logging.info("%s finished with success=%s", type(self).__name__,
1550 self.success)
1551
mbligh36768f02008-02-22 18:28:33 +00001552
1553
jadmanski0afbb632008-06-06 21:10:57 +00001554 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001555 if not self.started:
1556 self.prolog()
1557 self.run()
1558
1559 self.started = True
1560
1561
1562 def abort(self):
1563 if self.monitor:
1564 self.monitor.kill()
1565 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001566 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001567 self.cleanup()
1568
1569
showarded2afea2009-07-07 20:54:07 +00001570 def _get_consistent_execution_path(self, execution_entries):
1571 first_execution_path = execution_entries[0].execution_path()
1572 for execution_entry in execution_entries[1:]:
1573 assert execution_entry.execution_path() == first_execution_path, (
1574 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1575 execution_entry,
1576 first_execution_path,
1577 execution_entries[0]))
1578 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001579
1580
showarded2afea2009-07-07 20:54:07 +00001581 def _copy_results(self, execution_entries, use_monitor=None):
1582 """
1583 @param execution_entries: list of objects with execution_path() method
1584 """
showard6d1c1432009-08-20 23:30:39 +00001585 if use_monitor is not None and not use_monitor.has_process():
1586 return
1587
showarded2afea2009-07-07 20:54:07 +00001588 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001589 if use_monitor is None:
1590 assert self.monitor
1591 use_monitor = self.monitor
1592 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001593 execution_path = self._get_consistent_execution_path(execution_entries)
1594 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001595 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001596
showarda1e74b32009-05-12 17:32:04 +00001597
1598 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001599 for queue_entry in queue_entries:
1600 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001601
1602
mbligh4608b002010-01-05 18:22:35 +00001603 def _archive_results(self, queue_entries):
1604 for queue_entry in queue_entries:
1605 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001606
1607
showardd1195652009-12-08 22:21:02 +00001608 def _command_line(self):
1609 """
1610 Return the command line to run. Must be overridden.
1611 """
1612 raise NotImplementedError
1613
1614
1615 @property
1616 def num_processes(self):
1617 """
1618 Return the number of processes forked by this AgentTask's process. It
1619 may only be approximate. To be overridden if necessary.
1620 """
1621 return 1
1622
1623
1624 def _paired_with_monitor(self):
1625 """
1626 If this AgentTask's process must run on the same machine as some
1627 previous process, this method should be overridden to return a
1628 PidfileRunMonitor for that process.
1629 """
1630 return self._NullMonitor()
1631
1632
1633 @property
1634 def owner_username(self):
1635 """
1636 Return login of user responsible for this task. May be None. Must be
1637 overridden.
1638 """
1639 raise NotImplementedError
1640
1641
1642 def _working_directory(self):
1643 """
1644 Return the directory where this AgentTask's process executes. Must be
1645 overridden.
1646 """
1647 raise NotImplementedError
1648
1649
1650 def _pidfile_name(self):
1651 """
1652 Return the name of the pidfile this AgentTask's process uses. To be
1653 overridden if necessary.
1654 """
jamesrenc44ae992010-02-19 00:12:54 +00001655 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001656
1657
1658 def _check_paired_results_exist(self):
1659 if not self._paired_with_monitor().has_process():
1660 email_manager.manager.enqueue_notify_email(
1661 'No paired results in task',
1662 'No paired results in task %s at %s'
1663 % (self, self._paired_with_monitor().pidfile_id))
1664 self.finished(False)
1665 return False
1666 return True
1667
1668
1669 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001670 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001671 self.monitor = PidfileRunMonitor()
1672
1673
1674 def run(self):
1675 if not self._check_paired_results_exist():
1676 return
1677
1678 self._create_monitor()
1679 self.monitor.run(
1680 self._command_line(), self._working_directory(),
1681 num_processes=self.num_processes,
1682 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1683 pidfile_name=self._pidfile_name(),
1684 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001685 username=self.owner_username,
1686 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1687
1688
1689 def get_drone_hostnames_allowed(self):
1690 if not models.DroneSet.drone_sets_enabled():
1691 return None
1692
1693 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1694 if not hqes:
1695 # Only special tasks could be missing host queue entries
1696 assert isinstance(self, SpecialAgentTask)
1697 return self._user_or_global_default_drone_set(
1698 self.task, self.task.requested_by)
1699
1700 job_ids = hqes.values_list('job', flat=True).distinct()
1701 assert job_ids.count() == 1, ("AgentTask's queue entries "
1702 "span multiple jobs")
1703
1704 job = models.Job.objects.get(id=job_ids[0])
1705 drone_set = job.drone_set
1706 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001707 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001708
1709 return drone_set.get_drone_hostnames()
1710
1711
1712 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1713 """
1714 Returns the user's default drone set, if present.
1715
1716 Otherwise, returns the global default drone set.
1717 """
1718 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1719 if not user:
1720 logging.warn('%s had no owner; using default drone set',
1721 obj_with_owner)
1722 return default_hostnames
1723 if not user.drone_set:
1724 logging.warn('User %s has no default drone set, using global '
1725 'default', user.login)
1726 return default_hostnames
1727 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001728
1729
1730 def register_necessary_pidfiles(self):
1731 pidfile_id = _drone_manager.get_pidfile_id_from(
1732 self._working_directory(), self._pidfile_name())
1733 _drone_manager.register_pidfile(pidfile_id)
1734
1735 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1736 if paired_pidfile_id:
1737 _drone_manager.register_pidfile(paired_pidfile_id)
1738
1739
1740 def recover(self):
1741 if not self._check_paired_results_exist():
1742 return
1743
1744 self._create_monitor()
1745 self.monitor.attach_to_existing_process(
1746 self._working_directory(), pidfile_name=self._pidfile_name(),
1747 num_processes=self.num_processes)
1748 if not self.monitor.has_process():
1749 # no process to recover; wait to be started normally
1750 self.monitor = None
1751 return
1752
1753 self.started = True
1754 logging.info('Recovering process %s for %s at %s'
1755 % (self.monitor.get_process(), type(self).__name__,
1756 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001757
1758
mbligh4608b002010-01-05 18:22:35 +00001759 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1760 allowed_host_statuses=None):
1761 for entry in queue_entries:
1762 if entry.status not in allowed_hqe_statuses:
1763 raise SchedulerError('Queue task attempting to start '
1764 'entry with invalid status %s: %s'
1765 % (entry.status, entry))
1766 invalid_host_status = (
1767 allowed_host_statuses is not None
1768 and entry.host.status not in allowed_host_statuses)
1769 if invalid_host_status:
1770 raise SchedulerError('Queue task attempting to start on queue '
1771 'entry with invalid host status %s: %s'
1772 % (entry.host.status, entry))
1773
1774
showardd9205182009-04-27 20:09:55 +00001775class TaskWithJobKeyvals(object):
1776 """AgentTask mixin providing functionality to help with job keyval files."""
1777 _KEYVAL_FILE = 'keyval'
1778 def _format_keyval(self, key, value):
1779 return '%s=%s' % (key, value)
1780
1781
1782 def _keyval_path(self):
1783 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001784 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001785
1786
1787 def _write_keyval_after_job(self, field, value):
1788 assert self.monitor
1789 if not self.monitor.has_process():
1790 return
1791 _drone_manager.write_lines_to_file(
1792 self._keyval_path(), [self._format_keyval(field, value)],
1793 paired_with_process=self.monitor.get_process())
1794
1795
1796 def _job_queued_keyval(self, job):
1797 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1798
1799
1800 def _write_job_finished(self):
1801 self._write_keyval_after_job("job_finished", int(time.time()))
1802
1803
showarddb502762009-09-09 15:31:20 +00001804 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1805 keyval_contents = '\n'.join(self._format_keyval(key, value)
1806 for key, value in keyval_dict.iteritems())
1807 # always end with a newline to allow additional keyvals to be written
1808 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001809 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001810 keyval_contents,
1811 file_path=keyval_path)
1812
1813
1814 def _write_keyvals_before_job(self, keyval_dict):
1815 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1816
1817
1818 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001819 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001820 host.hostname)
1821 platform, all_labels = host.platform_and_labels()
1822 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1823 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1824
1825
showard8cc058f2009-09-08 16:26:33 +00001826class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001827 """
1828 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1829 """
1830
1831 TASK_TYPE = None
1832 host = None
1833 queue_entry = None
1834
showardd1195652009-12-08 22:21:02 +00001835 def __init__(self, task, extra_command_args):
1836 super(SpecialAgentTask, self).__init__()
1837
lmrb7c5d272010-04-16 06:34:04 +00001838 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001839
jamesrenc44ae992010-02-19 00:12:54 +00001840 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001841 self.queue_entry = None
1842 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001843 self.queue_entry = scheduler_models.HostQueueEntry(
1844 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001845
showarded2afea2009-07-07 20:54:07 +00001846 self.task = task
1847 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001848
1849
showard8cc058f2009-09-08 16:26:33 +00001850 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001851 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1852
1853
1854 def _command_line(self):
1855 return _autoserv_command_line(self.host.hostname,
1856 self._extra_command_args,
1857 queue_entry=self.queue_entry)
1858
1859
1860 def _working_directory(self):
1861 return self.task.execution_path()
1862
1863
1864 @property
1865 def owner_username(self):
1866 if self.task.requested_by:
1867 return self.task.requested_by.login
1868 return None
showard8cc058f2009-09-08 16:26:33 +00001869
1870
showarded2afea2009-07-07 20:54:07 +00001871 def prolog(self):
1872 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001873 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001874 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001875
1876
showardde634ee2009-01-30 01:44:24 +00001877 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001878 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001879
showard2fe3f1d2009-07-06 20:19:11 +00001880 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001881 return # don't fail metahost entries, they'll be reassigned
1882
showard2fe3f1d2009-07-06 20:19:11 +00001883 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001884 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001885 return # entry has been aborted
1886
showard2fe3f1d2009-07-06 20:19:11 +00001887 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001888 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001889 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001890 self._write_keyval_after_job(queued_key, queued_time)
1891 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001892
showard8cc058f2009-09-08 16:26:33 +00001893 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001894 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001895 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001896 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001897
showard8cc058f2009-09-08 16:26:33 +00001898 pidfile_id = _drone_manager.get_pidfile_id_from(
1899 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001900 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001901 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001902
1903 if self.queue_entry.job.parse_failed_repair:
1904 self._parse_results([self.queue_entry])
1905 else:
1906 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001907
1908
1909 def cleanup(self):
1910 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001911
1912 # We will consider an aborted task to be "Failed"
1913 self.task.finish(bool(self.success))
1914
showardf85a0b72009-10-07 20:48:45 +00001915 if self.monitor:
1916 if self.monitor.has_process():
1917 self._copy_results([self.task])
1918 if self.monitor.pidfile_id is not None:
1919 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001920
1921
1922class RepairTask(SpecialAgentTask):
1923 TASK_TYPE = models.SpecialTask.Task.REPAIR
1924
1925
showardd1195652009-12-08 22:21:02 +00001926 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001927 """\
1928 queue_entry: queue entry to mark failed if this repair fails.
1929 """
1930 protection = host_protections.Protection.get_string(
1931 task.host.protection)
1932 # normalize the protection name
1933 protection = host_protections.Protection.get_attr_name(protection)
1934
1935 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001936 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001937
1938 # *don't* include the queue entry in IDs -- if the queue entry is
1939 # aborted, we want to leave the repair task running
1940 self._set_ids(host=self.host)
1941
1942
1943 def prolog(self):
1944 super(RepairTask, self).prolog()
1945 logging.info("repair_task starting")
1946 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001947
1948
jadmanski0afbb632008-06-06 21:10:57 +00001949 def epilog(self):
1950 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001951
jadmanski0afbb632008-06-06 21:10:57 +00001952 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001953 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001954 else:
showard8cc058f2009-09-08 16:26:33 +00001955 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001956 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001957 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001958
1959
showarded2afea2009-07-07 20:54:07 +00001960class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001961 def _copy_to_results_repository(self):
1962 if not self.queue_entry or self.queue_entry.meta_host:
1963 return
1964
1965 self.queue_entry.set_execution_subdir()
1966 log_name = os.path.basename(self.task.execution_path())
1967 source = os.path.join(self.task.execution_path(), 'debug',
1968 'autoserv.DEBUG')
1969 destination = os.path.join(
1970 self.queue_entry.execution_path(), log_name)
1971
1972 self.monitor.try_copy_to_results_repository(
1973 source, destination_path=destination)
1974
1975
showard170873e2009-01-07 00:22:26 +00001976 def epilog(self):
1977 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001978
showard775300b2009-09-09 15:30:50 +00001979 if self.success:
1980 return
showard8fe93b52008-11-18 17:53:22 +00001981
showard775300b2009-09-09 15:30:50 +00001982 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001983
showard775300b2009-09-09 15:30:50 +00001984 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001985 # effectively ignore failure for these hosts
1986 self.success = True
showard775300b2009-09-09 15:30:50 +00001987 return
1988
1989 if self.queue_entry:
1990 self.queue_entry.requeue()
1991
1992 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001993 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001994 queue_entry__id=self.queue_entry.id):
1995 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1996 self._fail_queue_entry()
1997 return
1998
showard9bb960b2009-11-19 01:02:11 +00001999 queue_entry = models.HostQueueEntry.objects.get(
2000 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00002001 else:
2002 queue_entry = None
2003
2004 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002005 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002006 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002007 queue_entry=queue_entry,
2008 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002009
showard8fe93b52008-11-18 17:53:22 +00002010
2011class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002012 TASK_TYPE = models.SpecialTask.Task.VERIFY
2013
2014
showardd1195652009-12-08 22:21:02 +00002015 def __init__(self, task):
2016 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002017 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002018
2019
jadmanski0afbb632008-06-06 21:10:57 +00002020 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002021 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002022
showardb18134f2009-03-20 20:52:18 +00002023 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002024 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002025 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2026 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002027
jamesren42318f72010-05-10 23:40:59 +00002028 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00002029 # and there's no need to keep records of other requests.
2030 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002031 host__id=self.host.id,
2032 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00002033 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00002034 queued_verifies = queued_verifies.exclude(id=self.task.id)
2035 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002036
mbligh36768f02008-02-22 18:28:33 +00002037
jadmanski0afbb632008-06-06 21:10:57 +00002038 def epilog(self):
2039 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002040 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002041 if self.queue_entry:
2042 self.queue_entry.on_pending()
2043 else:
2044 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002045
2046
mbligh4608b002010-01-05 18:22:35 +00002047class CleanupTask(PreJobTask):
2048 # note this can also run post-job, but when it does, it's running standalone
2049 # against the host (not related to the job), so it's not considered a
2050 # PostJobTask
2051
2052 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2053
2054
2055 def __init__(self, task, recover_run_monitor=None):
2056 super(CleanupTask, self).__init__(task, ['--cleanup'])
2057 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2058
2059
2060 def prolog(self):
2061 super(CleanupTask, self).prolog()
2062 logging.info("starting cleanup task for host: %s", self.host.hostname)
2063 self.host.set_status(models.Host.Status.CLEANING)
2064 if self.queue_entry:
2065 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2066
2067
2068 def _finish_epilog(self):
2069 if not self.queue_entry or not self.success:
2070 return
2071
2072 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2073 should_run_verify = (
2074 self.queue_entry.job.run_verify
2075 and self.host.protection != do_not_verify_protection)
2076 if should_run_verify:
2077 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2078 models.SpecialTask.objects.create(
2079 host=models.Host.objects.get(id=self.host.id),
2080 queue_entry=entry,
2081 task=models.SpecialTask.Task.VERIFY)
2082 else:
2083 self.queue_entry.on_pending()
2084
2085
2086 def epilog(self):
2087 super(CleanupTask, self).epilog()
2088
2089 if self.success:
2090 self.host.update_field('dirty', 0)
2091 self.host.set_status(models.Host.Status.READY)
2092
2093 self._finish_epilog()
2094
2095
showarda9545c02009-12-18 22:44:26 +00002096class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2097 """
2098 Common functionality for QueueTask and HostlessQueueTask
2099 """
2100 def __init__(self, queue_entries):
2101 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002102 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002103 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002104
2105
showard73ec0442009-02-07 02:05:20 +00002106 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002107 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002108
2109
jamesrenc44ae992010-02-19 00:12:54 +00002110 def _write_control_file(self, execution_path):
2111 control_path = _drone_manager.attach_file_to_execution(
2112 execution_path, self.job.control_file)
2113 return control_path
2114
2115
showardd1195652009-12-08 22:21:02 +00002116 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002117 execution_path = self.queue_entries[0].execution_path()
2118 control_path = self._write_control_file(execution_path)
2119 hostnames = ','.join(entry.host.hostname
2120 for entry in self.queue_entries
2121 if not entry.is_hostless())
2122
2123 execution_tag = self.queue_entries[0].execution_tag()
2124 params = _autoserv_command_line(
2125 hostnames,
2126 ['-P', execution_tag, '-n',
2127 _drone_manager.absolute_path(control_path)],
2128 job=self.job, verbose=False)
2129
2130 if not self.job.is_server_job():
2131 params.append('-c')
2132
2133 return params
showardd1195652009-12-08 22:21:02 +00002134
2135
2136 @property
2137 def num_processes(self):
2138 return len(self.queue_entries)
2139
2140
2141 @property
2142 def owner_username(self):
2143 return self.job.owner
2144
2145
2146 def _working_directory(self):
2147 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002148
2149
jadmanski0afbb632008-06-06 21:10:57 +00002150 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002151 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002152 keyval_dict = self.job.keyval_dict()
2153 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002154 group_name = self.queue_entries[0].get_group_name()
2155 if group_name:
2156 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002157 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002158 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002159 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002160 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002161
2162
showard35162b02009-03-03 02:17:30 +00002163 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002164 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002165 _drone_manager.write_lines_to_file(error_file_path,
2166 [_LOST_PROCESS_ERROR])
2167
2168
showardd3dc1992009-04-22 21:01:40 +00002169 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002170 if not self.monitor:
2171 return
2172
showardd9205182009-04-27 20:09:55 +00002173 self._write_job_finished()
2174
showard35162b02009-03-03 02:17:30 +00002175 if self.monitor.lost_process:
2176 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002177
jadmanskif7fa2cc2008-10-01 14:13:23 +00002178
showardcbd74612008-11-19 21:42:02 +00002179 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002180 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002181 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002182 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002183 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002184
2185
jadmanskif7fa2cc2008-10-01 14:13:23 +00002186 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002187 if not self.monitor or not self.monitor.has_process():
2188 return
2189
jadmanskif7fa2cc2008-10-01 14:13:23 +00002190 # build up sets of all the aborted_by and aborted_on values
2191 aborted_by, aborted_on = set(), set()
2192 for queue_entry in self.queue_entries:
2193 if queue_entry.aborted_by:
2194 aborted_by.add(queue_entry.aborted_by)
2195 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2196 aborted_on.add(t)
2197
2198 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002199 # TODO(showard): this conditional is now obsolete, we just need to leave
2200 # it in temporarily for backwards compatibility over upgrades. delete
2201 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002202 assert len(aborted_by) <= 1
2203 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002204 aborted_by_value = aborted_by.pop()
2205 aborted_on_value = max(aborted_on)
2206 else:
2207 aborted_by_value = 'autotest_system'
2208 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002209
showarda0382352009-02-11 23:36:43 +00002210 self._write_keyval_after_job("aborted_by", aborted_by_value)
2211 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002212
showardcbd74612008-11-19 21:42:02 +00002213 aborted_on_string = str(datetime.datetime.fromtimestamp(
2214 aborted_on_value))
2215 self._write_status_comment('Job aborted by %s on %s' %
2216 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002217
2218
jadmanski0afbb632008-06-06 21:10:57 +00002219 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002220 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002221 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002222 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002223
2224
jadmanski0afbb632008-06-06 21:10:57 +00002225 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002226 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002227 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002228
2229
2230class QueueTask(AbstractQueueTask):
2231 def __init__(self, queue_entries):
2232 super(QueueTask, self).__init__(queue_entries)
2233 self._set_ids(queue_entries=queue_entries)
2234
2235
2236 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002237 self._check_queue_entry_statuses(
2238 self.queue_entries,
2239 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2240 models.HostQueueEntry.Status.RUNNING),
2241 allowed_host_statuses=(models.Host.Status.PENDING,
2242 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002243
2244 super(QueueTask, self).prolog()
2245
2246 for queue_entry in self.queue_entries:
2247 self._write_host_keyvals(queue_entry.host)
2248 queue_entry.host.set_status(models.Host.Status.RUNNING)
2249 queue_entry.host.update_field('dirty', 1)
2250 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2251 # TODO(gps): Remove this if nothing needs it anymore.
2252 # A potential user is: tko/parser
2253 self.job.write_to_machines_file(self.queue_entries[0])
2254
2255
2256 def _finish_task(self):
2257 super(QueueTask, self)._finish_task()
2258
2259 for queue_entry in self.queue_entries:
2260 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002261
2262
mbligh4608b002010-01-05 18:22:35 +00002263class HostlessQueueTask(AbstractQueueTask):
2264 def __init__(self, queue_entry):
2265 super(HostlessQueueTask, self).__init__([queue_entry])
2266 self.queue_entry_ids = [queue_entry.id]
2267
2268
2269 def prolog(self):
2270 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2271 super(HostlessQueueTask, self).prolog()
2272
2273
mbligh4608b002010-01-05 18:22:35 +00002274 def _finish_task(self):
2275 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002276 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002277
2278
showardd3dc1992009-04-22 21:01:40 +00002279class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002280 def __init__(self, queue_entries, log_file_name):
2281 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002282
showardd1195652009-12-08 22:21:02 +00002283 self.queue_entries = queue_entries
2284
showardd3dc1992009-04-22 21:01:40 +00002285 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002286 self._autoserv_monitor.attach_to_existing_process(
2287 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002288
showardd1195652009-12-08 22:21:02 +00002289
2290 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002291 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002292 return 'true'
2293 return self._generate_command(
2294 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002295
2296
2297 def _generate_command(self, results_dir):
2298 raise NotImplementedError('Subclasses must override this')
2299
2300
showardd1195652009-12-08 22:21:02 +00002301 @property
2302 def owner_username(self):
2303 return self.queue_entries[0].job.owner
2304
2305
2306 def _working_directory(self):
2307 return self._get_consistent_execution_path(self.queue_entries)
2308
2309
2310 def _paired_with_monitor(self):
2311 return self._autoserv_monitor
2312
2313
showardd3dc1992009-04-22 21:01:40 +00002314 def _job_was_aborted(self):
2315 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002316 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002317 queue_entry.update_from_database()
2318 if was_aborted is None: # first queue entry
2319 was_aborted = bool(queue_entry.aborted)
2320 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2321 email_manager.manager.enqueue_notify_email(
2322 'Inconsistent abort state',
2323 'Queue entries have inconsistent abort state: ' +
2324 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2325 # don't crash here, just assume true
2326 return True
2327 return was_aborted
2328
2329
showardd1195652009-12-08 22:21:02 +00002330 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002331 if self._job_was_aborted():
2332 return models.HostQueueEntry.Status.ABORTED
2333
2334 # we'll use a PidfileRunMonitor to read the autoserv exit status
2335 if self._autoserv_monitor.exit_code() == 0:
2336 return models.HostQueueEntry.Status.COMPLETED
2337 return models.HostQueueEntry.Status.FAILED
2338
2339
showardd3dc1992009-04-22 21:01:40 +00002340 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002341 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002342 queue_entry.set_status(status)
2343
2344
2345 def abort(self):
2346 # override AgentTask.abort() to avoid killing the process and ending
2347 # the task. post-job tasks continue when the job is aborted.
2348 pass
2349
2350
mbligh4608b002010-01-05 18:22:35 +00002351 def _pidfile_label(self):
2352 # '.autoserv_execute' -> 'autoserv'
2353 return self._pidfile_name()[1:-len('_execute')]
2354
2355
showard9bb960b2009-11-19 01:02:11 +00002356class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002357 """
2358 Task responsible for
2359 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2360 * copying logs to the results repository
2361 * spawning CleanupTasks for hosts, if necessary
2362 * spawning a FinalReparseTask for the job
2363 """
showardd1195652009-12-08 22:21:02 +00002364 def __init__(self, queue_entries, recover_run_monitor=None):
2365 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002366 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002367 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002368 self._set_ids(queue_entries=queue_entries)
2369
2370
2371 def _generate_command(self, results_dir):
2372 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002373 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002374 return [_autoserv_path , '-p',
2375 '--pidfile-label=%s' % self._pidfile_label(),
2376 '--use-existing-results', '--collect-crashinfo',
2377 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002378
2379
showardd1195652009-12-08 22:21:02 +00002380 @property
2381 def num_processes(self):
2382 return len(self.queue_entries)
2383
2384
2385 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002386 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002387
2388
showardd3dc1992009-04-22 21:01:40 +00002389 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002390 self._check_queue_entry_statuses(
2391 self.queue_entries,
2392 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2393 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002394
showardd3dc1992009-04-22 21:01:40 +00002395 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002396
2397
showardd3dc1992009-04-22 21:01:40 +00002398 def epilog(self):
2399 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002400 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002401 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002402
showard9bb960b2009-11-19 01:02:11 +00002403
2404 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002405 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002406 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002407 models.HostQueueEntry.Status.COMPLETED)
2408 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2409 else:
2410 final_success = False
2411 num_tests_failed = 0
2412
showard9bb960b2009-11-19 01:02:11 +00002413 reboot_after = self._job.reboot_after
2414 do_reboot = (
2415 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002416 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002417 or reboot_after == model_attributes.RebootAfter.ALWAYS
2418 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002419 and final_success and num_tests_failed == 0))
2420
showardd1195652009-12-08 22:21:02 +00002421 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002422 if do_reboot:
2423 # don't pass the queue entry to the CleanupTask. if the cleanup
2424 # fails, the job doesn't care -- it's over.
2425 models.SpecialTask.objects.create(
2426 host=models.Host.objects.get(id=queue_entry.host.id),
2427 task=models.SpecialTask.Task.CLEANUP,
2428 requested_by=self._job.owner_model())
2429 else:
2430 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002431
2432
showard0bbfc212009-04-29 21:06:13 +00002433 def run(self):
showard597bfd32009-05-08 18:22:50 +00002434 autoserv_exit_code = self._autoserv_monitor.exit_code()
2435 # only run if Autoserv exited due to some signal. if we have no exit
2436 # code, assume something bad (and signal-like) happened.
2437 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002438 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002439 else:
2440 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002441
2442
mbligh4608b002010-01-05 18:22:35 +00002443class SelfThrottledPostJobTask(PostJobTask):
2444 """
2445 Special AgentTask subclass that maintains its own global process limit.
2446 """
2447 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002448
2449
mbligh4608b002010-01-05 18:22:35 +00002450 @classmethod
2451 def _increment_running_processes(cls):
2452 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002453
mblighd5c95802008-03-05 00:33:46 +00002454
mbligh4608b002010-01-05 18:22:35 +00002455 @classmethod
2456 def _decrement_running_processes(cls):
2457 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002458
2459
mbligh4608b002010-01-05 18:22:35 +00002460 @classmethod
2461 def _max_processes(cls):
2462 raise NotImplementedError
2463
2464
2465 @classmethod
2466 def _can_run_new_process(cls):
2467 return cls._num_running_processes < cls._max_processes()
2468
2469
2470 def _process_started(self):
2471 return bool(self.monitor)
2472
2473
2474 def tick(self):
2475 # override tick to keep trying to start until the process count goes
2476 # down and we can, at which point we revert to default behavior
2477 if self._process_started():
2478 super(SelfThrottledPostJobTask, self).tick()
2479 else:
2480 self._try_starting_process()
2481
2482
2483 def run(self):
2484 # override run() to not actually run unless we can
2485 self._try_starting_process()
2486
2487
2488 def _try_starting_process(self):
2489 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002490 return
2491
mbligh4608b002010-01-05 18:22:35 +00002492 # actually run the command
2493 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002494 if self._process_started():
2495 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002496
mblighd5c95802008-03-05 00:33:46 +00002497
mbligh4608b002010-01-05 18:22:35 +00002498 def finished(self, success):
2499 super(SelfThrottledPostJobTask, self).finished(success)
2500 if self._process_started():
2501 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002502
showard21baa452008-10-21 00:08:39 +00002503
mbligh4608b002010-01-05 18:22:35 +00002504class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002505 def __init__(self, queue_entries):
2506 super(FinalReparseTask, self).__init__(queue_entries,
2507 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002508 # don't use _set_ids, since we don't want to set the host_ids
2509 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002510
2511
2512 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002513 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002514 results_dir]
2515
2516
2517 @property
2518 def num_processes(self):
2519 return 0 # don't include parser processes in accounting
2520
2521
2522 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002523 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002524
2525
showard97aed502008-11-04 02:01:24 +00002526 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002527 def _max_processes(cls):
2528 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002529
2530
2531 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002532 self._check_queue_entry_statuses(
2533 self.queue_entries,
2534 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002535
showard97aed502008-11-04 02:01:24 +00002536 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002537
2538
2539 def epilog(self):
2540 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002541 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002542
2543
mbligh4608b002010-01-05 18:22:35 +00002544class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002545 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2546
mbligh4608b002010-01-05 18:22:35 +00002547 def __init__(self, queue_entries):
2548 super(ArchiveResultsTask, self).__init__(queue_entries,
2549 log_file_name='.archiving.log')
2550 # don't use _set_ids, since we don't want to set the host_ids
2551 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002552
2553
mbligh4608b002010-01-05 18:22:35 +00002554 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002555 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002556
2557
mbligh4608b002010-01-05 18:22:35 +00002558 def _generate_command(self, results_dir):
2559 return [_autoserv_path , '-p',
2560 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002561 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002562 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2563 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002564
2565
mbligh4608b002010-01-05 18:22:35 +00002566 @classmethod
2567 def _max_processes(cls):
2568 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002569
2570
2571 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002572 self._check_queue_entry_statuses(
2573 self.queue_entries,
2574 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2575
2576 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002577
2578
mbligh4608b002010-01-05 18:22:35 +00002579 def epilog(self):
2580 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002581 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002582 failed_file = os.path.join(self._working_directory(),
2583 self._ARCHIVING_FAILED_FILE)
2584 paired_process = self._paired_with_monitor().get_process()
2585 _drone_manager.write_lines_to_file(
2586 failed_file, ['Archiving failed with exit code %s'
2587 % self.monitor.exit_code()],
2588 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002589 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002590
2591
mbligh36768f02008-02-22 18:28:33 +00002592if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002593 main()