blob: 14a329955874104ac4f13b8cd6a129e73282d978 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
80 raise SchedulerError('Drone sets are enabled, but no default is set')
81
82
83def _sanity_check():
84 """Make sure the configs are consistent before starting the scheduler"""
85 _verify_default_drone_set_exists()
86
87
mbligh36768f02008-02-22 18:28:33 +000088def main():
showard27f33872009-04-07 18:20:53 +000089 try:
showard549afad2009-08-20 23:33:36 +000090 try:
91 main_without_exception_handling()
92 except SystemExit:
93 raise
94 except:
95 logging.exception('Exception escaping in monitor_db')
96 raise
97 finally:
98 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000099
100
101def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000102 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000103
showard136e6dc2009-06-10 19:38:49 +0000104 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000105 parser = optparse.OptionParser(usage)
106 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
107 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000108 parser.add_option('--test', help='Indicate that scheduler is under ' +
109 'test and should use dummy autoserv and no parsing',
110 action='store_true')
111 (options, args) = parser.parse_args()
112 if len(args) != 1:
113 parser.print_usage()
114 return
mbligh36768f02008-02-22 18:28:33 +0000115
showard5613c662009-06-08 23:30:33 +0000116 scheduler_enabled = global_config.global_config.get_config_value(
117 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
118
119 if not scheduler_enabled:
120 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
121 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000122 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000123 sys.exit(1)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 global RESULTS_DIR
126 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000127
mbligh83c1e9e2009-05-01 23:10:41 +0000128 site_init = utils.import_site_function(__file__,
129 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
130 _site_init_monitor_db_dummy)
131 site_init()
132
showardcca334f2009-03-12 20:38:34 +0000133 # Change the cwd while running to avoid issues incase we were launched from
134 # somewhere odd (such as a random NFS home directory of the person running
135 # sudo to launch us as the appropriate user).
136 os.chdir(RESULTS_DIR)
137
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
jamesrenc44ae992010-02-19 00:12:54 +0000145 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000146 server.start()
147
jadmanski0afbb632008-06-06 21:10:57 +0000148 try:
jamesrenc44ae992010-02-19 00:12:54 +0000149 initialize()
showardc5afc462009-01-13 00:09:39 +0000150 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000151 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000152
jadmanski0afbb632008-06-06 21:10:57 +0000153 while not _shutdown:
154 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000155 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000156 except:
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.log_stacktrace(
158 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000159
showard170873e2009-01-07 00:22:26 +0000160 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000161 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000162 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000163 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000164
165
showard136e6dc2009-06-10 19:38:49 +0000166def setup_logging():
167 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
168 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
169 logging_manager.configure_logging(
170 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
171 logfile_name=log_name)
172
173
mbligh36768f02008-02-22 18:28:33 +0000174def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000175 global _shutdown
176 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000177 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000178
179
jamesrenc44ae992010-02-19 00:12:54 +0000180def initialize():
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
182 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000183
showard8de37132009-08-31 18:33:08 +0000184 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000185 logging.critical("monitor_db already running, aborting!")
186 sys.exit(1)
187 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000188
showardb1e51872008-10-07 11:08:18 +0000189 if _testing_mode:
190 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000191 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000192
jadmanski0afbb632008-06-06 21:10:57 +0000193 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
194 global _db
showard170873e2009-01-07 00:22:26 +0000195 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000196 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000197
showardfa8629c2008-11-04 16:51:23 +0000198 # ensure Django connection is in autocommit
199 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000200 # bypass the readonly connection
201 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000202
showardb18134f2009-03-20 20:52:18 +0000203 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000204 signal.signal(signal.SIGINT, handle_sigint)
205
jamesrenc44ae992010-02-19 00:12:54 +0000206 initialize_globals()
207 scheduler_models.initialize()
208
showardd1ee1dd2009-01-07 21:33:08 +0000209 drones = global_config.global_config.get_config_value(
210 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
211 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000212 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000213 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000214 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
215
showardb18134f2009-03-20 20:52:18 +0000216 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000217
218
jamesrenc44ae992010-02-19 00:12:54 +0000219def initialize_globals():
220 global _drone_manager
221 _drone_manager = drone_manager.instance()
222
223
showarded2afea2009-07-07 20:54:07 +0000224def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
225 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000226 """
227 @returns The autoserv command line as a list of executable + parameters.
228
229 @param machines - string - A machine or comma separated list of machines
230 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000231 @param extra_args - list - Additional arguments to pass to autoserv.
232 @param job - Job object - If supplied, -u owner and -l name parameters
233 will be added.
234 @param queue_entry - A HostQueueEntry object - If supplied and no Job
235 object was supplied, this will be used to lookup the Job object.
236 """
showarda9545c02009-12-18 22:44:26 +0000237 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000238 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000239 if machines:
240 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000241 if job or queue_entry:
242 if not job:
243 job = queue_entry.job
244 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000245 if verbose:
246 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000247 return autoserv_argv + extra_args
248
249
showard89f84db2009-03-12 20:39:13 +0000250class SchedulerError(Exception):
251 """Raised by HostScheduler when an inconsistent state occurs."""
252
253
jamesren883492a2010-02-12 00:45:18 +0000254class HostScheduler(metahost_scheduler.HostSchedulingUtility):
255 """Handles the logic for choosing when to run jobs and on which hosts.
256
257 This class makes several queries to the database on each tick, building up
258 some auxiliary data structures and using them to determine which hosts are
259 eligible to run which jobs, taking into account all the various factors that
260 affect that.
261
262 In the past this was done with one or two very large, complex database
263 queries. It has proven much simpler and faster to build these auxiliary
264 data structures and perform the logic in Python.
265 """
266 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000267 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
268
269 # load site-specific scheduler selected in global_config
270 site_schedulers_str = global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
272 default='')
273 site_schedulers = set(site_schedulers_str.split(','))
274 for scheduler in get_site_metahost_schedulers():
275 if type(scheduler).__name__ in site_schedulers:
276 # always prepend, so site schedulers take precedence
277 self._metahost_schedulers = (
278 [scheduler] + self._metahost_schedulers)
279 logging.info('Metahost schedulers: %s',
280 ', '.join(type(scheduler).__name__ for scheduler
281 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000282
283
showard63a34772008-08-18 19:32:50 +0000284 def _get_ready_hosts(self):
285 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000286 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000287 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
288 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000289 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000290 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000291 "AND NOT afe_hosts.locked "
292 "AND (afe_hosts.status IS NULL "
293 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000294 return dict((host.id, host) for host in hosts)
295
296
297 @staticmethod
298 def _get_sql_id_list(id_list):
299 return ','.join(str(item_id) for item_id in id_list)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000304 if not id_list:
305 return {}
showard63a34772008-08-18 19:32:50 +0000306 query %= cls._get_sql_id_list(id_list)
307 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000308 return cls._process_many2many_dict(rows, flip)
309
310
311 @staticmethod
312 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000313 result = {}
314 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000315 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000316 if flip:
317 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000318 result.setdefault(left_id, set()).add(right_id)
319 return result
320
321
322 @classmethod
323 def _get_job_acl_groups(cls, job_ids):
324 query = """
showardeab66ce2009-12-23 00:03:56 +0000325 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
326 FROM afe_jobs
327 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
328 INNER JOIN afe_acl_groups_users ON
329 afe_acl_groups_users.user_id = afe_users.id
330 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000331 """
332 return cls._get_many2many_dict(query, job_ids)
333
334
335 @classmethod
336 def _get_job_ineligible_hosts(cls, job_ids):
337 query = """
338 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000339 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000340 WHERE job_id IN (%s)
341 """
342 return cls._get_many2many_dict(query, job_ids)
343
344
345 @classmethod
showard989f25d2008-10-01 11:38:11 +0000346 def _get_job_dependencies(cls, job_ids):
347 query = """
348 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000349 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000350 WHERE job_id IN (%s)
351 """
352 return cls._get_many2many_dict(query, job_ids)
353
354
355 @classmethod
showard63a34772008-08-18 19:32:50 +0000356 def _get_host_acls(cls, host_ids):
357 query = """
showardd9ac4452009-02-07 02:04:37 +0000358 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000359 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000360 WHERE host_id IN (%s)
361 """
362 return cls._get_many2many_dict(query, host_ids)
363
364
365 @classmethod
366 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000367 if not host_ids:
368 return {}, {}
showard63a34772008-08-18 19:32:50 +0000369 query = """
370 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000371 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000372 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000373 """ % cls._get_sql_id_list(host_ids)
374 rows = _db.execute(query)
375 labels_to_hosts = cls._process_many2many_dict(rows)
376 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
377 return labels_to_hosts, hosts_to_labels
378
379
380 @classmethod
381 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000382 return dict((label.id, label) for label
383 in scheduler_models.Label.fetch())
384
385
386 def recovery_on_startup(self):
387 for metahost_scheduler in self._metahost_schedulers:
388 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000389
390
391 def refresh(self, pending_queue_entries):
392 self._hosts_available = self._get_ready_hosts()
393
394 relevant_jobs = [queue_entry.job_id
395 for queue_entry in pending_queue_entries]
396 self._job_acls = self._get_job_acl_groups(relevant_jobs)
397 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000398 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000399
400 host_ids = self._hosts_available.keys()
401 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000402 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
403
404 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000405
jamesrene21bf412010-02-26 02:30:07 +0000406
407 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000408 for metahost_scheduler in self._metahost_schedulers:
409 metahost_scheduler.tick()
410
showard63a34772008-08-18 19:32:50 +0000411
jamesren883492a2010-02-12 00:45:18 +0000412 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000413 return set(self._label_hosts.get(label_id, ()))
414
415
416 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000417 self._label_hosts[label_id].remove(host_id)
418
419
420 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000421 return self._hosts_available.pop(host_id)
422
423
424 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000425 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
426
427
showard63a34772008-08-18 19:32:50 +0000428 def _is_acl_accessible(self, host_id, queue_entry):
429 job_acls = self._job_acls.get(queue_entry.job_id, set())
430 host_acls = self._host_acls.get(host_id, set())
431 return len(host_acls.intersection(job_acls)) > 0
432
433
showard989f25d2008-10-01 11:38:11 +0000434 def _check_job_dependencies(self, job_dependencies, host_labels):
435 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000436 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000437
438
439 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
440 queue_entry):
showardade14e22009-01-26 22:38:32 +0000441 if not queue_entry.meta_host:
442 # bypass only_if_needed labels when a specific host is selected
443 return True
444
showard989f25d2008-10-01 11:38:11 +0000445 for label_id in host_labels:
446 label = self._labels[label_id]
447 if not label.only_if_needed:
448 # we don't care about non-only_if_needed labels
449 continue
450 if queue_entry.meta_host == label_id:
451 # if the label was requested in a metahost it's OK
452 continue
453 if label_id not in job_dependencies:
454 return False
455 return True
456
457
showard89f84db2009-03-12 20:39:13 +0000458 def _check_atomic_group_labels(self, host_labels, queue_entry):
459 """
460 Determine if the given HostQueueEntry's atomic group settings are okay
461 to schedule on a host with the given labels.
462
showard6157c632009-07-06 20:19:31 +0000463 @param host_labels: A list of label ids that the host has.
464 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000465
466 @returns True if atomic group settings are okay, False otherwise.
467 """
showard6157c632009-07-06 20:19:31 +0000468 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000469 queue_entry.atomic_group_id)
470
471
showard6157c632009-07-06 20:19:31 +0000472 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000473 """
474 Return the atomic group label id for a host with the given set of
475 labels if any, or None otherwise. Raises an exception if more than
476 one atomic group are found in the set of labels.
477
showard6157c632009-07-06 20:19:31 +0000478 @param host_labels: A list of label ids that the host has.
479 @param queue_entry: The HostQueueEntry we're testing. Only used for
480 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000481
482 @returns The id of the atomic group found on a label in host_labels
483 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000484 """
showard6157c632009-07-06 20:19:31 +0000485 atomic_labels = [self._labels[label_id] for label_id in host_labels
486 if self._labels[label_id].atomic_group_id is not None]
487 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000488 if not atomic_ids:
489 return None
490 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000491 logging.error('More than one Atomic Group on HQE "%s" via: %r',
492 queue_entry, atomic_labels)
493 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000494
495
496 def _get_atomic_group_labels(self, atomic_group_id):
497 """
498 Lookup the label ids that an atomic_group is associated with.
499
500 @param atomic_group_id - The id of the AtomicGroup to look up.
501
502 @returns A generator yeilding Label ids for this atomic group.
503 """
504 return (id for id, label in self._labels.iteritems()
505 if label.atomic_group_id == atomic_group_id
506 and not label.invalid)
507
508
showard54c1ea92009-05-20 00:32:58 +0000509 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000510 """
511 @param group_hosts - A sequence of Host ids to test for usability
512 and eligibility against the Job associated with queue_entry.
513 @param queue_entry - The HostQueueEntry that these hosts are being
514 tested for eligibility against.
515
516 @returns A subset of group_hosts Host ids that are eligible for the
517 supplied queue_entry.
518 """
519 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000520 if self.is_host_usable(host_id)
521 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000522
523
jamesren883492a2010-02-12 00:45:18 +0000524 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000525 if self._is_host_invalid(host_id):
526 # if an invalid host is scheduled for a job, it's a one-time host
527 # and it therefore bypasses eligibility checks. note this can only
528 # happen for non-metahosts, because invalid hosts have their label
529 # relationships cleared.
530 return True
531
showard989f25d2008-10-01 11:38:11 +0000532 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
533 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000534
showard89f84db2009-03-12 20:39:13 +0000535 return (self._is_acl_accessible(host_id, queue_entry) and
536 self._check_job_dependencies(job_dependencies, host_labels) and
537 self._check_only_if_needed_labels(
538 job_dependencies, host_labels, queue_entry) and
539 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000540
541
showard2924b0a2009-06-18 23:16:15 +0000542 def _is_host_invalid(self, host_id):
543 host_object = self._hosts_available.get(host_id, None)
544 return host_object and host_object.invalid
545
546
showard63a34772008-08-18 19:32:50 +0000547 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000548 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000549 return None
550 return self._hosts_available.pop(queue_entry.host_id, None)
551
552
jamesren883492a2010-02-12 00:45:18 +0000553 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000554 if host_id not in self._hosts_available:
555 # host was already used during this scheduling cycle
556 return False
557 if self._hosts_available[host_id].invalid:
558 # Invalid hosts cannot be used for metahosts. They're included in
559 # the original query because they can be used by non-metahosts.
560 return False
561 return True
562
563
jamesren883492a2010-02-12 00:45:18 +0000564 def schedule_entry(self, queue_entry):
565 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000566 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000567
568 for scheduler in self._metahost_schedulers:
569 if scheduler.can_schedule_metahost(queue_entry):
570 scheduler.schedule_metahost(queue_entry, self)
571 return None
572
573 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000574
575
showard89f84db2009-03-12 20:39:13 +0000576 def find_eligible_atomic_group(self, queue_entry):
577 """
578 Given an atomic group host queue entry, locate an appropriate group
579 of hosts for the associated job to run on.
580
581 The caller is responsible for creating new HQEs for the additional
582 hosts returned in order to run the actual job on them.
583
584 @returns A list of Host instances in a ready state to satisfy this
585 atomic group scheduling. Hosts will all belong to the same
586 atomic group label as specified by the queue_entry.
587 An empty list will be returned if no suitable atomic
588 group could be found.
589
590 TODO(gps): what is responsible for kicking off any attempted repairs on
591 a group of hosts? not this function, but something needs to. We do
592 not communicate that reason for returning [] outside of here...
593 For now, we'll just be unschedulable if enough hosts within one group
594 enter Repair Failed state.
595 """
596 assert queue_entry.atomic_group_id is not None
597 job = queue_entry.job
598 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000599 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000600 if job.synch_count > atomic_group.max_number_of_machines:
601 # Such a Job and HostQueueEntry should never be possible to
602 # create using the frontend. Regardless, we can't process it.
603 # Abort it immediately and log an error on the scheduler.
604 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000605 logging.error(
606 'Error: job %d synch_count=%d > requested atomic_group %d '
607 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
608 job.id, job.synch_count, atomic_group.id,
609 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000610 return []
jamesren883492a2010-02-12 00:45:18 +0000611 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
612 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000613
614 # Look in each label associated with atomic_group until we find one with
615 # enough hosts to satisfy the job.
616 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000617 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000618 if queue_entry.meta_host is not None:
619 # If we have a metahost label, only allow its hosts.
620 group_hosts.intersection_update(hosts_in_label)
621 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000622 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000623 group_hosts, queue_entry)
624
625 # Job.synch_count is treated as "minimum synch count" when
626 # scheduling for an atomic group of hosts. The atomic group
627 # number of machines is the maximum to pick out of a single
628 # atomic group label for scheduling at one time.
629 min_hosts = job.synch_count
630 max_hosts = atomic_group.max_number_of_machines
631
showard54c1ea92009-05-20 00:32:58 +0000632 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000633 # Not enough eligible hosts in this atomic group label.
634 continue
635
showard54c1ea92009-05-20 00:32:58 +0000636 eligible_hosts_in_group = [self._hosts_available[id]
637 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000638 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000639 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000640
showard89f84db2009-03-12 20:39:13 +0000641 # Limit ourselves to scheduling the atomic group size.
642 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000643 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000644
645 # Remove the selected hosts from our cached internal state
646 # of available hosts in order to return the Host objects.
647 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000648 for host in eligible_hosts_in_group:
649 hosts_in_label.discard(host.id)
650 self._hosts_available.pop(host.id)
651 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000652 return host_list
653
654 return []
655
656
showard170873e2009-01-07 00:22:26 +0000657class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000658 def __init__(self):
659 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000660 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000661 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000662 user_cleanup_time = scheduler_config.config.clean_interval
663 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
664 _db, user_cleanup_time)
665 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000666 self._host_agents = {}
667 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000668 self._tick_count = 0
669 self._last_garbage_stats_time = time.time()
670 self._seconds_between_garbage_stats = 60 * (
671 global_config.global_config.get_config_value(
672 scheduler_config.CONFIG_SECTION,
673 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000674
mbligh36768f02008-02-22 18:28:33 +0000675
showard915958d2009-04-22 21:00:58 +0000676 def initialize(self, recover_hosts=True):
677 self._periodic_cleanup.initialize()
678 self._24hr_upkeep.initialize()
679
jadmanski0afbb632008-06-06 21:10:57 +0000680 # always recover processes
681 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000682
jadmanski0afbb632008-06-06 21:10:57 +0000683 if recover_hosts:
684 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000685
jamesrenc44ae992010-02-19 00:12:54 +0000686 self._host_scheduler.recovery_on_startup()
687
mbligh36768f02008-02-22 18:28:33 +0000688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000690 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000691 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000692 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000693 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000694 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000695 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000696 self._schedule_running_host_queue_entries()
697 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000698 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000699 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000700 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000701 _drone_manager.execute_actions()
702 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000703 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000704 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000705
showard97aed502008-11-04 02:01:24 +0000706
mblighf3294cc2009-04-08 21:17:38 +0000707 def _run_cleanup(self):
708 self._periodic_cleanup.run_cleanup_maybe()
709 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000710
mbligh36768f02008-02-22 18:28:33 +0000711
showardf13a9e22009-12-18 22:54:09 +0000712 def _garbage_collection(self):
713 threshold_time = time.time() - self._seconds_between_garbage_stats
714 if threshold_time < self._last_garbage_stats_time:
715 # Don't generate these reports very often.
716 return
717
718 self._last_garbage_stats_time = time.time()
719 # Force a full level 0 collection (because we can, it doesn't hurt
720 # at this interval).
721 gc.collect()
722 logging.info('Logging garbage collector stats on tick %d.',
723 self._tick_count)
724 gc_stats._log_garbage_collector_stats()
725
726
showard170873e2009-01-07 00:22:26 +0000727 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
728 for object_id in object_ids:
729 agent_dict.setdefault(object_id, set()).add(agent)
730
731
732 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
733 for object_id in object_ids:
734 assert object_id in agent_dict
735 agent_dict[object_id].remove(agent)
736
737
showardd1195652009-12-08 22:21:02 +0000738 def add_agent_task(self, agent_task):
739 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000740 self._agents.append(agent)
741 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000742 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
743 self._register_agent_for_ids(self._queue_entry_agents,
744 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000745
showard170873e2009-01-07 00:22:26 +0000746
747 def get_agents_for_entry(self, queue_entry):
748 """
749 Find agents corresponding to the specified queue_entry.
750 """
showardd3dc1992009-04-22 21:01:40 +0000751 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000752
753
754 def host_has_agent(self, host):
755 """
756 Determine if there is currently an Agent present using this host.
757 """
758 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000759
760
jadmanski0afbb632008-06-06 21:10:57 +0000761 def remove_agent(self, agent):
762 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000763 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
764 agent)
765 self._unregister_agent_for_ids(self._queue_entry_agents,
766 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000767
768
showard8cc058f2009-09-08 16:26:33 +0000769 def _host_has_scheduled_special_task(self, host):
770 return bool(models.SpecialTask.objects.filter(host__id=host.id,
771 is_active=False,
772 is_complete=False))
773
774
jadmanski0afbb632008-06-06 21:10:57 +0000775 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000776 agent_tasks = self._create_recovery_agent_tasks()
777 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000778 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000779 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000780 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000781 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000782 self._reverify_remaining_hosts()
783 # reinitialize drones after killing orphaned processes, since they can
784 # leave around files when they die
785 _drone_manager.execute_actions()
786 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000787
showard170873e2009-01-07 00:22:26 +0000788
showardd1195652009-12-08 22:21:02 +0000789 def _create_recovery_agent_tasks(self):
790 return (self._get_queue_entry_agent_tasks()
791 + self._get_special_task_agent_tasks(is_active=True))
792
793
794 def _get_queue_entry_agent_tasks(self):
795 # host queue entry statuses handled directly by AgentTasks (Verifying is
796 # handled through SpecialTasks, so is not listed here)
797 statuses = (models.HostQueueEntry.Status.STARTING,
798 models.HostQueueEntry.Status.RUNNING,
799 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000800 models.HostQueueEntry.Status.PARSING,
801 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000802 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000803 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000804 where='status IN (%s)' % status_list)
805
806 agent_tasks = []
807 used_queue_entries = set()
808 for entry in queue_entries:
809 if self.get_agents_for_entry(entry):
810 # already being handled
811 continue
812 if entry in used_queue_entries:
813 # already picked up by a synchronous job
814 continue
815 agent_task = self._get_agent_task_for_queue_entry(entry)
816 agent_tasks.append(agent_task)
817 used_queue_entries.update(agent_task.queue_entries)
818 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000819
820
showardd1195652009-12-08 22:21:02 +0000821 def _get_special_task_agent_tasks(self, is_active=False):
822 special_tasks = models.SpecialTask.objects.filter(
823 is_active=is_active, is_complete=False)
824 return [self._get_agent_task_for_special_task(task)
825 for task in special_tasks]
826
827
828 def _get_agent_task_for_queue_entry(self, queue_entry):
829 """
830 Construct an AgentTask instance for the given active HostQueueEntry,
831 if one can currently run it.
832 @param queue_entry: a HostQueueEntry
833 @returns an AgentTask to run the queue entry
834 """
835 task_entries = queue_entry.job.get_group_entries(queue_entry)
836 self._check_for_duplicate_host_entries(task_entries)
837
838 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
839 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000840 if queue_entry.is_hostless():
841 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000842 return QueueTask(queue_entries=task_entries)
843 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
844 return GatherLogsTask(queue_entries=task_entries)
845 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
846 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000847 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
848 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000849
850 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000851 'invalid status %s: %s'
852 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000853
854
855 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000856 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
857 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000858 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000859 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000860 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000861 if using_host:
showardd1195652009-12-08 22:21:02 +0000862 self._assert_host_has_no_agent(task_entry)
863
864
865 def _assert_host_has_no_agent(self, entry):
866 """
867 @param entry: a HostQueueEntry or a SpecialTask
868 """
869 if self.host_has_agent(entry.host):
870 agent = tuple(self._host_agents.get(entry.host.id))[0]
871 raise SchedulerError(
872 'While scheduling %s, host %s already has a host agent %s'
873 % (entry, entry.host, agent.task))
874
875
876 def _get_agent_task_for_special_task(self, special_task):
877 """
878 Construct an AgentTask class to run the given SpecialTask and add it
879 to this dispatcher.
880 @param special_task: a models.SpecialTask instance
881 @returns an AgentTask to run this SpecialTask
882 """
883 self._assert_host_has_no_agent(special_task)
884
885 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
886 for agent_task_class in special_agent_task_classes:
887 if agent_task_class.TASK_TYPE == special_task.task:
888 return agent_task_class(task=special_task)
889
890 raise SchedulerError('No AgentTask class for task', str(special_task))
891
892
893 def _register_pidfiles(self, agent_tasks):
894 for agent_task in agent_tasks:
895 agent_task.register_necessary_pidfiles()
896
897
898 def _recover_tasks(self, agent_tasks):
899 orphans = _drone_manager.get_orphaned_autoserv_processes()
900
901 for agent_task in agent_tasks:
902 agent_task.recover()
903 if agent_task.monitor and agent_task.monitor.has_process():
904 orphans.discard(agent_task.monitor.get_process())
905 self.add_agent_task(agent_task)
906
907 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000908
909
showard8cc058f2009-09-08 16:26:33 +0000910 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000911 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
912 % status):
showard0db3d432009-10-12 20:29:15 +0000913 if entry.status == status and not self.get_agents_for_entry(entry):
914 # The status can change during iteration, e.g., if job.run()
915 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000916 yield entry
917
918
showard6878e8b2009-07-20 22:37:45 +0000919 def _check_for_remaining_orphan_processes(self, orphans):
920 if not orphans:
921 return
922 subject = 'Unrecovered orphan autoserv processes remain'
923 message = '\n'.join(str(process) for process in orphans)
924 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000925
926 die_on_orphans = global_config.global_config.get_config_value(
927 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
928
929 if die_on_orphans:
930 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000931
showard170873e2009-01-07 00:22:26 +0000932
showard8cc058f2009-09-08 16:26:33 +0000933 def _recover_pending_entries(self):
934 for entry in self._get_unassigned_entries(
935 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000936 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000937 entry.on_pending()
938
939
showardb8900452009-10-12 20:31:01 +0000940 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000941 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000942 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
943 unrecovered_hqes = []
944 for queue_entry in queue_entries:
945 special_tasks = models.SpecialTask.objects.filter(
946 task__in=(models.SpecialTask.Task.CLEANUP,
947 models.SpecialTask.Task.VERIFY),
948 queue_entry__id=queue_entry.id,
949 is_complete=False)
950 if special_tasks.count() == 0:
951 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000952
showardb8900452009-10-12 20:31:01 +0000953 if unrecovered_hqes:
954 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000955 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000956 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000957 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000958
959
showard65db3932009-10-28 19:54:35 +0000960 def _get_prioritized_special_tasks(self):
961 """
962 Returns all queued SpecialTasks prioritized for repair first, then
963 cleanup, then verify.
964 """
965 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
966 is_complete=False,
967 host__locked=False)
968 # exclude hosts with active queue entries unless the SpecialTask is for
969 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000970 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000971 queued_tasks, 'afe_host_queue_entries', 'host_id',
972 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000973 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000974 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000975 where=['(afe_host_queue_entries.id IS NULL OR '
976 'afe_host_queue_entries.id = '
977 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000978
showard65db3932009-10-28 19:54:35 +0000979 # reorder tasks by priority
980 task_priority_order = [models.SpecialTask.Task.REPAIR,
981 models.SpecialTask.Task.CLEANUP,
982 models.SpecialTask.Task.VERIFY]
983 def task_priority_key(task):
984 return task_priority_order.index(task.task)
985 return sorted(queued_tasks, key=task_priority_key)
986
987
showard65db3932009-10-28 19:54:35 +0000988 def _schedule_special_tasks(self):
989 """
990 Execute queued SpecialTasks that are ready to run on idle hosts.
991 """
992 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000993 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000994 continue
showardd1195652009-12-08 22:21:02 +0000995 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000996
997
showard170873e2009-01-07 00:22:26 +0000998 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000999 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +00001000 # should never happen
showarded2afea2009-07-07 20:54:07 +00001001 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +00001002 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +00001003 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +00001004 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +00001005 print_message=message)
mblighbb421852008-03-11 22:36:16 +00001006
1007
jadmanski0afbb632008-06-06 21:10:57 +00001008 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +00001009 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +00001010 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001011 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001012 if self.host_has_agent(host):
1013 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001014 continue
showard8cc058f2009-09-08 16:26:33 +00001015 if self._host_has_scheduled_special_task(host):
1016 # host will have a special task scheduled on the next cycle
1017 continue
showard170873e2009-01-07 00:22:26 +00001018 if print_message:
showardb18134f2009-03-20 20:52:18 +00001019 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001020 models.SpecialTask.objects.create(
1021 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001022 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001023
1024
jadmanski0afbb632008-06-06 21:10:57 +00001025 def _recover_hosts(self):
1026 # recover "Repair Failed" hosts
1027 message = 'Reverifying dead host %s'
1028 self._reverify_hosts_where("status = 'Repair Failed'",
1029 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001030
1031
showard04c82c52008-05-29 19:38:12 +00001032
showardb95b1bd2008-08-15 18:11:04 +00001033 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001034 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001035 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001036 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001037 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001038 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001039
1040
showard89f84db2009-03-12 20:39:13 +00001041 def _refresh_pending_queue_entries(self):
1042 """
1043 Lookup the pending HostQueueEntries and call our HostScheduler
1044 refresh() method given that list. Return the list.
1045
1046 @returns A list of pending HostQueueEntries sorted in priority order.
1047 """
showard63a34772008-08-18 19:32:50 +00001048 queue_entries = self._get_pending_queue_entries()
1049 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001050 return []
showardb95b1bd2008-08-15 18:11:04 +00001051
showard63a34772008-08-18 19:32:50 +00001052 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001053
showard89f84db2009-03-12 20:39:13 +00001054 return queue_entries
1055
1056
1057 def _schedule_atomic_group(self, queue_entry):
1058 """
1059 Schedule the given queue_entry on an atomic group of hosts.
1060
1061 Returns immediately if there are insufficient available hosts.
1062
1063 Creates new HostQueueEntries based off of queue_entry for the
1064 scheduled hosts and starts them all running.
1065 """
1066 # This is a virtual host queue entry representing an entire
1067 # atomic group, find a group and schedule their hosts.
1068 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1069 queue_entry)
1070 if not group_hosts:
1071 return
showardcbe6f942009-06-17 19:33:49 +00001072
1073 logging.info('Expanding atomic group entry %s with hosts %s',
1074 queue_entry,
1075 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001076
showard89f84db2009-03-12 20:39:13 +00001077 for assigned_host in group_hosts[1:]:
1078 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001079 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001080 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001081 new_hqe.set_host(assigned_host)
1082 self._run_queue_entry(new_hqe)
1083
1084 # The first assigned host uses the original HostQueueEntry
1085 queue_entry.set_host(group_hosts[0])
1086 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001087
1088
showarda9545c02009-12-18 22:44:26 +00001089 def _schedule_hostless_job(self, queue_entry):
1090 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001091 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001092
1093
showard89f84db2009-03-12 20:39:13 +00001094 def _schedule_new_jobs(self):
1095 queue_entries = self._refresh_pending_queue_entries()
1096 if not queue_entries:
1097 return
1098
showard63a34772008-08-18 19:32:50 +00001099 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001100 is_unassigned_atomic_group = (
1101 queue_entry.atomic_group_id is not None
1102 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001103
1104 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001105 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001106 elif is_unassigned_atomic_group:
1107 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001108 else:
jamesren883492a2010-02-12 00:45:18 +00001109 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001110 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001111 assert assigned_host.id == queue_entry.host_id
1112 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001113
1114
showard8cc058f2009-09-08 16:26:33 +00001115 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001116 for agent_task in self._get_queue_entry_agent_tasks():
1117 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001118
1119
1120 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001121 for entry in scheduler_models.HostQueueEntry.fetch(
1122 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001123 task = entry.job.schedule_delayed_callback_task(entry)
1124 if task:
showardd1195652009-12-08 22:21:02 +00001125 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001126
1127
jamesren883492a2010-02-12 00:45:18 +00001128 def _run_queue_entry(self, queue_entry):
1129 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001130
1131
jadmanski0afbb632008-06-06 21:10:57 +00001132 def _find_aborting(self):
jamesrenc44ae992010-02-19 00:12:54 +00001133 for entry in scheduler_models.HostQueueEntry.fetch(
1134 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001135 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001136 for agent in self.get_agents_for_entry(entry):
1137 agent.abort()
1138 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001139
1140
showard324bf812009-01-20 23:23:38 +00001141 def _can_start_agent(self, agent, num_started_this_cycle,
1142 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001143 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001144 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001145 return True
1146 # don't allow any nonzero-process agents to run after we've reached a
1147 # limit (this avoids starvation of many-process agents)
1148 if have_reached_limit:
1149 return False
1150 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001151 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +00001152 agent.task.owner_username,
1153 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +00001154 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001155 return False
1156 # if a single agent exceeds the per-cycle throttling, still allow it to
1157 # run when it's the first agent in the cycle
1158 if num_started_this_cycle == 0:
1159 return True
1160 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001161 if (num_started_this_cycle + agent.task.num_processes >
1162 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001163 return False
1164 return True
1165
1166
jadmanski0afbb632008-06-06 21:10:57 +00001167 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001168 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001169 have_reached_limit = False
1170 # iterate over copy, so we can remove agents during iteration
1171 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001172 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001173 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001174 have_reached_limit):
1175 have_reached_limit = True
1176 continue
showardd1195652009-12-08 22:21:02 +00001177 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001178 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001179 if agent.is_done():
1180 logging.info("agent finished")
1181 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001182 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001183 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001184
1185
showard29f7cd22009-04-29 21:16:24 +00001186 def _process_recurring_runs(self):
1187 recurring_runs = models.RecurringRun.objects.filter(
1188 start_date__lte=datetime.datetime.now())
1189 for rrun in recurring_runs:
1190 # Create job from template
1191 job = rrun.job
1192 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001193 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001194
1195 host_objects = info['hosts']
1196 one_time_hosts = info['one_time_hosts']
1197 metahost_objects = info['meta_hosts']
1198 dependencies = info['dependencies']
1199 atomic_group = info['atomic_group']
1200
1201 for host in one_time_hosts or []:
1202 this_host = models.Host.create_one_time_host(host.hostname)
1203 host_objects.append(this_host)
1204
1205 try:
1206 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001207 options=options,
showard29f7cd22009-04-29 21:16:24 +00001208 host_objects=host_objects,
1209 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001210 atomic_group=atomic_group)
1211
1212 except Exception, ex:
1213 logging.exception(ex)
1214 #TODO send email
1215
1216 if rrun.loop_count == 1:
1217 rrun.delete()
1218 else:
1219 if rrun.loop_count != 0: # if not infinite loop
1220 # calculate new start_date
1221 difference = datetime.timedelta(seconds=rrun.loop_period)
1222 rrun.start_date = rrun.start_date + difference
1223 rrun.loop_count -= 1
1224 rrun.save()
1225
1226
showard170873e2009-01-07 00:22:26 +00001227class PidfileRunMonitor(object):
1228 """
1229 Client must call either run() to start a new process or
1230 attach_to_existing_process().
1231 """
mbligh36768f02008-02-22 18:28:33 +00001232
showard170873e2009-01-07 00:22:26 +00001233 class _PidfileException(Exception):
1234 """
1235 Raised when there's some unexpected behavior with the pid file, but only
1236 used internally (never allowed to escape this class).
1237 """
mbligh36768f02008-02-22 18:28:33 +00001238
1239
showard170873e2009-01-07 00:22:26 +00001240 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001241 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001242 self._start_time = None
1243 self.pidfile_id = None
1244 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001245
1246
showard170873e2009-01-07 00:22:26 +00001247 def _add_nice_command(self, command, nice_level):
1248 if not nice_level:
1249 return command
1250 return ['nice', '-n', str(nice_level)] + command
1251
1252
1253 def _set_start_time(self):
1254 self._start_time = time.time()
1255
1256
showard418785b2009-11-23 20:19:59 +00001257 def run(self, command, working_directory, num_processes, nice_level=None,
1258 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +00001259 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +00001260 assert command is not None
1261 if nice_level is not None:
1262 command = ['nice', '-n', str(nice_level)] + command
1263 self._set_start_time()
1264 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001265 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001266 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +00001267 paired_with_pidfile=paired_with_pidfile, username=username,
1268 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +00001269
1270
showarded2afea2009-07-07 20:54:07 +00001271 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001272 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001273 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001274 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001275 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001276 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001277 if num_processes is not None:
1278 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001279
1280
jadmanski0afbb632008-06-06 21:10:57 +00001281 def kill(self):
showard170873e2009-01-07 00:22:26 +00001282 if self.has_process():
1283 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001284
mbligh36768f02008-02-22 18:28:33 +00001285
showard170873e2009-01-07 00:22:26 +00001286 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001287 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001288 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001289
1290
showard170873e2009-01-07 00:22:26 +00001291 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001292 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001293 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001294 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001295
1296
showard170873e2009-01-07 00:22:26 +00001297 def _read_pidfile(self, use_second_read=False):
1298 assert self.pidfile_id is not None, (
1299 'You must call run() or attach_to_existing_process()')
1300 contents = _drone_manager.get_pidfile_contents(
1301 self.pidfile_id, use_second_read=use_second_read)
1302 if contents.is_invalid():
1303 self._state = drone_manager.PidfileContents()
1304 raise self._PidfileException(contents)
1305 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001306
1307
showard21baa452008-10-21 00:08:39 +00001308 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001309 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1310 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001311 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001312 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001313
1314
1315 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001316 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001317 return
mblighbb421852008-03-11 22:36:16 +00001318
showard21baa452008-10-21 00:08:39 +00001319 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001320
showard170873e2009-01-07 00:22:26 +00001321 if self._state.process is None:
1322 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001323 return
mbligh90a549d2008-03-25 23:52:34 +00001324
showard21baa452008-10-21 00:08:39 +00001325 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001326 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001327 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001328 return
mbligh90a549d2008-03-25 23:52:34 +00001329
showard170873e2009-01-07 00:22:26 +00001330 # pid but no running process - maybe process *just* exited
1331 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001332 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001333 # autoserv exited without writing an exit code
1334 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001335 self._handle_pidfile_error(
1336 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001337
showard21baa452008-10-21 00:08:39 +00001338
1339 def _get_pidfile_info(self):
1340 """\
1341 After completion, self._state will contain:
1342 pid=None, exit_status=None if autoserv has not yet run
1343 pid!=None, exit_status=None if autoserv is running
1344 pid!=None, exit_status!=None if autoserv has completed
1345 """
1346 try:
1347 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001348 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001349 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001350
1351
showard170873e2009-01-07 00:22:26 +00001352 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001353 """\
1354 Called when no pidfile is found or no pid is in the pidfile.
1355 """
showard170873e2009-01-07 00:22:26 +00001356 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001357 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001358 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001359 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001360 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001361
1362
showard35162b02009-03-03 02:17:30 +00001363 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001364 """\
1365 Called when autoserv has exited without writing an exit status,
1366 or we've timed out waiting for autoserv to write a pid to the
1367 pidfile. In either case, we just return failure and the caller
1368 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001369
showard170873e2009-01-07 00:22:26 +00001370 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001371 """
1372 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001373 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001374 self._state.exit_status = 1
1375 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001379 self._get_pidfile_info()
1380 return self._state.exit_status
1381
1382
1383 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001384 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001385 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001386 if self._state.num_tests_failed is None:
1387 return -1
showard21baa452008-10-21 00:08:39 +00001388 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001389
1390
showardcdaeae82009-08-31 18:32:48 +00001391 def try_copy_results_on_drone(self, **kwargs):
1392 if self.has_process():
1393 # copy results logs into the normal place for job results
1394 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1395
1396
1397 def try_copy_to_results_repository(self, source, **kwargs):
1398 if self.has_process():
1399 _drone_manager.copy_to_results_repository(self.get_process(),
1400 source, **kwargs)
1401
1402
mbligh36768f02008-02-22 18:28:33 +00001403class Agent(object):
showard77182562009-06-10 00:16:05 +00001404 """
showard8cc058f2009-09-08 16:26:33 +00001405 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001406
1407 The following methods are required on all task objects:
1408 poll() - Called periodically to let the task check its status and
1409 update its internal state. If the task succeeded.
1410 is_done() - Returns True if the task is finished.
1411 abort() - Called when an abort has been requested. The task must
1412 set its aborted attribute to True if it actually aborted.
1413
1414 The following attributes are required on all task objects:
1415 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001416 success - bool, True if this task succeeded.
1417 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1418 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001419 """
1420
1421
showard418785b2009-11-23 20:19:59 +00001422 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001423 """
showard8cc058f2009-09-08 16:26:33 +00001424 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001425 """
showard8cc058f2009-09-08 16:26:33 +00001426 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001427
showard77182562009-06-10 00:16:05 +00001428 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001429 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001430
showard8cc058f2009-09-08 16:26:33 +00001431 self.queue_entry_ids = task.queue_entry_ids
1432 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001433
showard8cc058f2009-09-08 16:26:33 +00001434 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001435 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001436
1437
jadmanski0afbb632008-06-06 21:10:57 +00001438 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001439 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001440 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001441 self.task.poll()
1442 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001443 self.finished = True
showardec113162008-05-08 00:52:49 +00001444
1445
jadmanski0afbb632008-06-06 21:10:57 +00001446 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001447 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001448
1449
showardd3dc1992009-04-22 21:01:40 +00001450 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001451 if self.task:
1452 self.task.abort()
1453 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001454 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001455 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001456
showardd3dc1992009-04-22 21:01:40 +00001457
mbligh36768f02008-02-22 18:28:33 +00001458class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001459 class _NullMonitor(object):
1460 pidfile_id = None
1461
1462 def has_process(self):
1463 return True
1464
1465
1466 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001467 """
showardd1195652009-12-08 22:21:02 +00001468 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001469 """
jadmanski0afbb632008-06-06 21:10:57 +00001470 self.done = False
showardd1195652009-12-08 22:21:02 +00001471 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001472 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001473 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001474 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001475 self.queue_entry_ids = []
1476 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001477 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001478
1479
1480 def _set_ids(self, host=None, queue_entries=None):
1481 if queue_entries and queue_entries != [None]:
1482 self.host_ids = [entry.host.id for entry in queue_entries]
1483 self.queue_entry_ids = [entry.id for entry in queue_entries]
1484 else:
1485 assert host
1486 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001487
1488
jadmanski0afbb632008-06-06 21:10:57 +00001489 def poll(self):
showard08a36412009-05-05 01:01:13 +00001490 if not self.started:
1491 self.start()
showardd1195652009-12-08 22:21:02 +00001492 if not self.done:
1493 self.tick()
showard08a36412009-05-05 01:01:13 +00001494
1495
1496 def tick(self):
showardd1195652009-12-08 22:21:02 +00001497 assert self.monitor
1498 exit_code = self.monitor.exit_code()
1499 if exit_code is None:
1500 return
mbligh36768f02008-02-22 18:28:33 +00001501
showardd1195652009-12-08 22:21:02 +00001502 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001503 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001504
1505
jadmanski0afbb632008-06-06 21:10:57 +00001506 def is_done(self):
1507 return self.done
mbligh36768f02008-02-22 18:28:33 +00001508
1509
jadmanski0afbb632008-06-06 21:10:57 +00001510 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001511 if self.done:
showardd1195652009-12-08 22:21:02 +00001512 assert self.started
showard08a36412009-05-05 01:01:13 +00001513 return
showardd1195652009-12-08 22:21:02 +00001514 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001515 self.done = True
1516 self.success = success
1517 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001518
1519
jadmanski0afbb632008-06-06 21:10:57 +00001520 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001521 """
1522 To be overridden.
1523 """
showarded2afea2009-07-07 20:54:07 +00001524 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001525 self.register_necessary_pidfiles()
1526
1527
1528 def _log_file(self):
1529 if not self._log_file_name:
1530 return None
1531 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001532
mbligh36768f02008-02-22 18:28:33 +00001533
jadmanski0afbb632008-06-06 21:10:57 +00001534 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001535 log_file = self._log_file()
1536 if self.monitor and log_file:
1537 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001538
1539
jadmanski0afbb632008-06-06 21:10:57 +00001540 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001541 """
1542 To be overridden.
1543 """
jadmanski0afbb632008-06-06 21:10:57 +00001544 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001545 logging.info("%s finished with success=%s", type(self).__name__,
1546 self.success)
1547
mbligh36768f02008-02-22 18:28:33 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001551 if not self.started:
1552 self.prolog()
1553 self.run()
1554
1555 self.started = True
1556
1557
1558 def abort(self):
1559 if self.monitor:
1560 self.monitor.kill()
1561 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001562 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001563 self.cleanup()
1564
1565
showarded2afea2009-07-07 20:54:07 +00001566 def _get_consistent_execution_path(self, execution_entries):
1567 first_execution_path = execution_entries[0].execution_path()
1568 for execution_entry in execution_entries[1:]:
1569 assert execution_entry.execution_path() == first_execution_path, (
1570 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1571 execution_entry,
1572 first_execution_path,
1573 execution_entries[0]))
1574 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001575
1576
showarded2afea2009-07-07 20:54:07 +00001577 def _copy_results(self, execution_entries, use_monitor=None):
1578 """
1579 @param execution_entries: list of objects with execution_path() method
1580 """
showard6d1c1432009-08-20 23:30:39 +00001581 if use_monitor is not None and not use_monitor.has_process():
1582 return
1583
showarded2afea2009-07-07 20:54:07 +00001584 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001585 if use_monitor is None:
1586 assert self.monitor
1587 use_monitor = self.monitor
1588 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001589 execution_path = self._get_consistent_execution_path(execution_entries)
1590 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001591 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001592
showarda1e74b32009-05-12 17:32:04 +00001593
1594 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001595 for queue_entry in queue_entries:
1596 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001597
1598
mbligh4608b002010-01-05 18:22:35 +00001599 def _archive_results(self, queue_entries):
1600 for queue_entry in queue_entries:
1601 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001602
1603
showardd1195652009-12-08 22:21:02 +00001604 def _command_line(self):
1605 """
1606 Return the command line to run. Must be overridden.
1607 """
1608 raise NotImplementedError
1609
1610
1611 @property
1612 def num_processes(self):
1613 """
1614 Return the number of processes forked by this AgentTask's process. It
1615 may only be approximate. To be overridden if necessary.
1616 """
1617 return 1
1618
1619
1620 def _paired_with_monitor(self):
1621 """
1622 If this AgentTask's process must run on the same machine as some
1623 previous process, this method should be overridden to return a
1624 PidfileRunMonitor for that process.
1625 """
1626 return self._NullMonitor()
1627
1628
1629 @property
1630 def owner_username(self):
1631 """
1632 Return login of user responsible for this task. May be None. Must be
1633 overridden.
1634 """
1635 raise NotImplementedError
1636
1637
1638 def _working_directory(self):
1639 """
1640 Return the directory where this AgentTask's process executes. Must be
1641 overridden.
1642 """
1643 raise NotImplementedError
1644
1645
1646 def _pidfile_name(self):
1647 """
1648 Return the name of the pidfile this AgentTask's process uses. To be
1649 overridden if necessary.
1650 """
jamesrenc44ae992010-02-19 00:12:54 +00001651 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001652
1653
1654 def _check_paired_results_exist(self):
1655 if not self._paired_with_monitor().has_process():
1656 email_manager.manager.enqueue_notify_email(
1657 'No paired results in task',
1658 'No paired results in task %s at %s'
1659 % (self, self._paired_with_monitor().pidfile_id))
1660 self.finished(False)
1661 return False
1662 return True
1663
1664
1665 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001666 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001667 self.monitor = PidfileRunMonitor()
1668
1669
1670 def run(self):
1671 if not self._check_paired_results_exist():
1672 return
1673
1674 self._create_monitor()
1675 self.monitor.run(
1676 self._command_line(), self._working_directory(),
1677 num_processes=self.num_processes,
1678 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1679 pidfile_name=self._pidfile_name(),
1680 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001681 username=self.owner_username,
1682 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1683
1684
1685 def get_drone_hostnames_allowed(self):
1686 if not models.DroneSet.drone_sets_enabled():
1687 return None
1688
1689 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1690 if not hqes:
1691 # Only special tasks could be missing host queue entries
1692 assert isinstance(self, SpecialAgentTask)
1693 return self._user_or_global_default_drone_set(
1694 self.task, self.task.requested_by)
1695
1696 job_ids = hqes.values_list('job', flat=True).distinct()
1697 assert job_ids.count() == 1, ("AgentTask's queue entries "
1698 "span multiple jobs")
1699
1700 job = models.Job.objects.get(id=job_ids[0])
1701 drone_set = job.drone_set
1702 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001703 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001704
1705 return drone_set.get_drone_hostnames()
1706
1707
1708 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1709 """
1710 Returns the user's default drone set, if present.
1711
1712 Otherwise, returns the global default drone set.
1713 """
1714 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1715 if not user:
1716 logging.warn('%s had no owner; using default drone set',
1717 obj_with_owner)
1718 return default_hostnames
1719 if not user.drone_set:
1720 logging.warn('User %s has no default drone set, using global '
1721 'default', user.login)
1722 return default_hostnames
1723 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001724
1725
1726 def register_necessary_pidfiles(self):
1727 pidfile_id = _drone_manager.get_pidfile_id_from(
1728 self._working_directory(), self._pidfile_name())
1729 _drone_manager.register_pidfile(pidfile_id)
1730
1731 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1732 if paired_pidfile_id:
1733 _drone_manager.register_pidfile(paired_pidfile_id)
1734
1735
1736 def recover(self):
1737 if not self._check_paired_results_exist():
1738 return
1739
1740 self._create_monitor()
1741 self.monitor.attach_to_existing_process(
1742 self._working_directory(), pidfile_name=self._pidfile_name(),
1743 num_processes=self.num_processes)
1744 if not self.monitor.has_process():
1745 # no process to recover; wait to be started normally
1746 self.monitor = None
1747 return
1748
1749 self.started = True
1750 logging.info('Recovering process %s for %s at %s'
1751 % (self.monitor.get_process(), type(self).__name__,
1752 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001753
1754
mbligh4608b002010-01-05 18:22:35 +00001755 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1756 allowed_host_statuses=None):
1757 for entry in queue_entries:
1758 if entry.status not in allowed_hqe_statuses:
1759 raise SchedulerError('Queue task attempting to start '
1760 'entry with invalid status %s: %s'
1761 % (entry.status, entry))
1762 invalid_host_status = (
1763 allowed_host_statuses is not None
1764 and entry.host.status not in allowed_host_statuses)
1765 if invalid_host_status:
1766 raise SchedulerError('Queue task attempting to start on queue '
1767 'entry with invalid host status %s: %s'
1768 % (entry.host.status, entry))
1769
1770
showardd9205182009-04-27 20:09:55 +00001771class TaskWithJobKeyvals(object):
1772 """AgentTask mixin providing functionality to help with job keyval files."""
1773 _KEYVAL_FILE = 'keyval'
1774 def _format_keyval(self, key, value):
1775 return '%s=%s' % (key, value)
1776
1777
1778 def _keyval_path(self):
1779 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001780 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001781
1782
1783 def _write_keyval_after_job(self, field, value):
1784 assert self.monitor
1785 if not self.monitor.has_process():
1786 return
1787 _drone_manager.write_lines_to_file(
1788 self._keyval_path(), [self._format_keyval(field, value)],
1789 paired_with_process=self.monitor.get_process())
1790
1791
1792 def _job_queued_keyval(self, job):
1793 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1794
1795
1796 def _write_job_finished(self):
1797 self._write_keyval_after_job("job_finished", int(time.time()))
1798
1799
showarddb502762009-09-09 15:31:20 +00001800 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1801 keyval_contents = '\n'.join(self._format_keyval(key, value)
1802 for key, value in keyval_dict.iteritems())
1803 # always end with a newline to allow additional keyvals to be written
1804 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001805 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001806 keyval_contents,
1807 file_path=keyval_path)
1808
1809
1810 def _write_keyvals_before_job(self, keyval_dict):
1811 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1812
1813
1814 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001815 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001816 host.hostname)
1817 platform, all_labels = host.platform_and_labels()
1818 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1819 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1820
1821
showard8cc058f2009-09-08 16:26:33 +00001822class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001823 """
1824 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1825 """
1826
1827 TASK_TYPE = None
1828 host = None
1829 queue_entry = None
1830
showardd1195652009-12-08 22:21:02 +00001831 def __init__(self, task, extra_command_args):
1832 super(SpecialAgentTask, self).__init__()
1833
lmrb7c5d272010-04-16 06:34:04 +00001834 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001835
jamesrenc44ae992010-02-19 00:12:54 +00001836 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001837 self.queue_entry = None
1838 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001839 self.queue_entry = scheduler_models.HostQueueEntry(
1840 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001841
showarded2afea2009-07-07 20:54:07 +00001842 self.task = task
1843 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001844
1845
showard8cc058f2009-09-08 16:26:33 +00001846 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001847 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1848
1849
1850 def _command_line(self):
1851 return _autoserv_command_line(self.host.hostname,
1852 self._extra_command_args,
1853 queue_entry=self.queue_entry)
1854
1855
1856 def _working_directory(self):
1857 return self.task.execution_path()
1858
1859
1860 @property
1861 def owner_username(self):
1862 if self.task.requested_by:
1863 return self.task.requested_by.login
1864 return None
showard8cc058f2009-09-08 16:26:33 +00001865
1866
showarded2afea2009-07-07 20:54:07 +00001867 def prolog(self):
1868 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001869 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001870 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001871
1872
showardde634ee2009-01-30 01:44:24 +00001873 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001874 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001875
showard2fe3f1d2009-07-06 20:19:11 +00001876 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001877 return # don't fail metahost entries, they'll be reassigned
1878
showard2fe3f1d2009-07-06 20:19:11 +00001879 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001880 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001881 return # entry has been aborted
1882
showard2fe3f1d2009-07-06 20:19:11 +00001883 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001884 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001885 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001886 self._write_keyval_after_job(queued_key, queued_time)
1887 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001888
showard8cc058f2009-09-08 16:26:33 +00001889 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001890 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001891 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001892 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001893
showard8cc058f2009-09-08 16:26:33 +00001894 pidfile_id = _drone_manager.get_pidfile_id_from(
1895 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001896 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001897 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001898
1899 if self.queue_entry.job.parse_failed_repair:
1900 self._parse_results([self.queue_entry])
1901 else:
1902 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001903
1904
1905 def cleanup(self):
1906 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001907
1908 # We will consider an aborted task to be "Failed"
1909 self.task.finish(bool(self.success))
1910
showardf85a0b72009-10-07 20:48:45 +00001911 if self.monitor:
1912 if self.monitor.has_process():
1913 self._copy_results([self.task])
1914 if self.monitor.pidfile_id is not None:
1915 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001916
1917
1918class RepairTask(SpecialAgentTask):
1919 TASK_TYPE = models.SpecialTask.Task.REPAIR
1920
1921
showardd1195652009-12-08 22:21:02 +00001922 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001923 """\
1924 queue_entry: queue entry to mark failed if this repair fails.
1925 """
1926 protection = host_protections.Protection.get_string(
1927 task.host.protection)
1928 # normalize the protection name
1929 protection = host_protections.Protection.get_attr_name(protection)
1930
1931 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001932 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001933
1934 # *don't* include the queue entry in IDs -- if the queue entry is
1935 # aborted, we want to leave the repair task running
1936 self._set_ids(host=self.host)
1937
1938
1939 def prolog(self):
1940 super(RepairTask, self).prolog()
1941 logging.info("repair_task starting")
1942 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001943
1944
jadmanski0afbb632008-06-06 21:10:57 +00001945 def epilog(self):
1946 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001947
jadmanski0afbb632008-06-06 21:10:57 +00001948 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001949 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001950 else:
showard8cc058f2009-09-08 16:26:33 +00001951 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001952 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001953 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001954
1955
showarded2afea2009-07-07 20:54:07 +00001956class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001957 def _copy_to_results_repository(self):
1958 if not self.queue_entry or self.queue_entry.meta_host:
1959 return
1960
1961 self.queue_entry.set_execution_subdir()
1962 log_name = os.path.basename(self.task.execution_path())
1963 source = os.path.join(self.task.execution_path(), 'debug',
1964 'autoserv.DEBUG')
1965 destination = os.path.join(
1966 self.queue_entry.execution_path(), log_name)
1967
1968 self.monitor.try_copy_to_results_repository(
1969 source, destination_path=destination)
1970
1971
showard170873e2009-01-07 00:22:26 +00001972 def epilog(self):
1973 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001974
showard775300b2009-09-09 15:30:50 +00001975 if self.success:
1976 return
showard8fe93b52008-11-18 17:53:22 +00001977
showard775300b2009-09-09 15:30:50 +00001978 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001979
showard775300b2009-09-09 15:30:50 +00001980 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001981 # effectively ignore failure for these hosts
1982 self.success = True
showard775300b2009-09-09 15:30:50 +00001983 return
1984
1985 if self.queue_entry:
1986 self.queue_entry.requeue()
1987
1988 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001989 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001990 queue_entry__id=self.queue_entry.id):
1991 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1992 self._fail_queue_entry()
1993 return
1994
showard9bb960b2009-11-19 01:02:11 +00001995 queue_entry = models.HostQueueEntry.objects.get(
1996 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001997 else:
1998 queue_entry = None
1999
2000 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002001 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00002002 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00002003 queue_entry=queue_entry,
2004 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00002005
showard8fe93b52008-11-18 17:53:22 +00002006
2007class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002008 TASK_TYPE = models.SpecialTask.Task.VERIFY
2009
2010
showardd1195652009-12-08 22:21:02 +00002011 def __init__(self, task):
2012 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00002013 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00002014
2015
jadmanski0afbb632008-06-06 21:10:57 +00002016 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002017 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00002018
showardb18134f2009-03-20 20:52:18 +00002019 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002020 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00002021 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2022 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00002023
showarded2afea2009-07-07 20:54:07 +00002024 # Delete any other queued verifies for this host. One verify will do
2025 # and there's no need to keep records of other requests.
2026 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00002027 host__id=self.host.id,
2028 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00002029 is_active=False, is_complete=False)
2030 queued_verifies = queued_verifies.exclude(id=self.task.id)
2031 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00002032
mbligh36768f02008-02-22 18:28:33 +00002033
jadmanski0afbb632008-06-06 21:10:57 +00002034 def epilog(self):
2035 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002036 if self.success:
showard8cc058f2009-09-08 16:26:33 +00002037 if self.queue_entry:
2038 self.queue_entry.on_pending()
2039 else:
2040 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00002041
2042
mbligh4608b002010-01-05 18:22:35 +00002043class CleanupTask(PreJobTask):
2044 # note this can also run post-job, but when it does, it's running standalone
2045 # against the host (not related to the job), so it's not considered a
2046 # PostJobTask
2047
2048 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2049
2050
2051 def __init__(self, task, recover_run_monitor=None):
2052 super(CleanupTask, self).__init__(task, ['--cleanup'])
2053 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2054
2055
2056 def prolog(self):
2057 super(CleanupTask, self).prolog()
2058 logging.info("starting cleanup task for host: %s", self.host.hostname)
2059 self.host.set_status(models.Host.Status.CLEANING)
2060 if self.queue_entry:
2061 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2062
2063
2064 def _finish_epilog(self):
2065 if not self.queue_entry or not self.success:
2066 return
2067
2068 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2069 should_run_verify = (
2070 self.queue_entry.job.run_verify
2071 and self.host.protection != do_not_verify_protection)
2072 if should_run_verify:
2073 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2074 models.SpecialTask.objects.create(
2075 host=models.Host.objects.get(id=self.host.id),
2076 queue_entry=entry,
2077 task=models.SpecialTask.Task.VERIFY)
2078 else:
2079 self.queue_entry.on_pending()
2080
2081
2082 def epilog(self):
2083 super(CleanupTask, self).epilog()
2084
2085 if self.success:
2086 self.host.update_field('dirty', 0)
2087 self.host.set_status(models.Host.Status.READY)
2088
2089 self._finish_epilog()
2090
2091
showarda9545c02009-12-18 22:44:26 +00002092class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2093 """
2094 Common functionality for QueueTask and HostlessQueueTask
2095 """
2096 def __init__(self, queue_entries):
2097 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002098 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002099 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002100
2101
showard73ec0442009-02-07 02:05:20 +00002102 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002103 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002104
2105
jamesrenc44ae992010-02-19 00:12:54 +00002106 def _write_control_file(self, execution_path):
2107 control_path = _drone_manager.attach_file_to_execution(
2108 execution_path, self.job.control_file)
2109 return control_path
2110
2111
showardd1195652009-12-08 22:21:02 +00002112 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002113 execution_path = self.queue_entries[0].execution_path()
2114 control_path = self._write_control_file(execution_path)
2115 hostnames = ','.join(entry.host.hostname
2116 for entry in self.queue_entries
2117 if not entry.is_hostless())
2118
2119 execution_tag = self.queue_entries[0].execution_tag()
2120 params = _autoserv_command_line(
2121 hostnames,
2122 ['-P', execution_tag, '-n',
2123 _drone_manager.absolute_path(control_path)],
2124 job=self.job, verbose=False)
2125
2126 if not self.job.is_server_job():
2127 params.append('-c')
2128
2129 return params
showardd1195652009-12-08 22:21:02 +00002130
2131
2132 @property
2133 def num_processes(self):
2134 return len(self.queue_entries)
2135
2136
2137 @property
2138 def owner_username(self):
2139 return self.job.owner
2140
2141
2142 def _working_directory(self):
2143 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002144
2145
jadmanski0afbb632008-06-06 21:10:57 +00002146 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002147 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002148 keyval_dict = self.job.keyval_dict()
2149 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002150 group_name = self.queue_entries[0].get_group_name()
2151 if group_name:
2152 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002153 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002154 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002155 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002156 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002157
2158
showard35162b02009-03-03 02:17:30 +00002159 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002160 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002161 _drone_manager.write_lines_to_file(error_file_path,
2162 [_LOST_PROCESS_ERROR])
2163
2164
showardd3dc1992009-04-22 21:01:40 +00002165 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002166 if not self.monitor:
2167 return
2168
showardd9205182009-04-27 20:09:55 +00002169 self._write_job_finished()
2170
showard35162b02009-03-03 02:17:30 +00002171 if self.monitor.lost_process:
2172 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002173
jadmanskif7fa2cc2008-10-01 14:13:23 +00002174
showardcbd74612008-11-19 21:42:02 +00002175 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002176 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002177 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002178 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002179 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002180
2181
jadmanskif7fa2cc2008-10-01 14:13:23 +00002182 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002183 if not self.monitor or not self.monitor.has_process():
2184 return
2185
jadmanskif7fa2cc2008-10-01 14:13:23 +00002186 # build up sets of all the aborted_by and aborted_on values
2187 aborted_by, aborted_on = set(), set()
2188 for queue_entry in self.queue_entries:
2189 if queue_entry.aborted_by:
2190 aborted_by.add(queue_entry.aborted_by)
2191 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2192 aborted_on.add(t)
2193
2194 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002195 # TODO(showard): this conditional is now obsolete, we just need to leave
2196 # it in temporarily for backwards compatibility over upgrades. delete
2197 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002198 assert len(aborted_by) <= 1
2199 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002200 aborted_by_value = aborted_by.pop()
2201 aborted_on_value = max(aborted_on)
2202 else:
2203 aborted_by_value = 'autotest_system'
2204 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002205
showarda0382352009-02-11 23:36:43 +00002206 self._write_keyval_after_job("aborted_by", aborted_by_value)
2207 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002208
showardcbd74612008-11-19 21:42:02 +00002209 aborted_on_string = str(datetime.datetime.fromtimestamp(
2210 aborted_on_value))
2211 self._write_status_comment('Job aborted by %s on %s' %
2212 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002213
2214
jadmanski0afbb632008-06-06 21:10:57 +00002215 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002216 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002217 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002218 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002219
2220
jadmanski0afbb632008-06-06 21:10:57 +00002221 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002222 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002223 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002224
2225
2226class QueueTask(AbstractQueueTask):
2227 def __init__(self, queue_entries):
2228 super(QueueTask, self).__init__(queue_entries)
2229 self._set_ids(queue_entries=queue_entries)
2230
2231
2232 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002233 self._check_queue_entry_statuses(
2234 self.queue_entries,
2235 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2236 models.HostQueueEntry.Status.RUNNING),
2237 allowed_host_statuses=(models.Host.Status.PENDING,
2238 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002239
2240 super(QueueTask, self).prolog()
2241
2242 for queue_entry in self.queue_entries:
2243 self._write_host_keyvals(queue_entry.host)
2244 queue_entry.host.set_status(models.Host.Status.RUNNING)
2245 queue_entry.host.update_field('dirty', 1)
2246 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2247 # TODO(gps): Remove this if nothing needs it anymore.
2248 # A potential user is: tko/parser
2249 self.job.write_to_machines_file(self.queue_entries[0])
2250
2251
2252 def _finish_task(self):
2253 super(QueueTask, self)._finish_task()
2254
2255 for queue_entry in self.queue_entries:
2256 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002257
2258
mbligh4608b002010-01-05 18:22:35 +00002259class HostlessQueueTask(AbstractQueueTask):
2260 def __init__(self, queue_entry):
2261 super(HostlessQueueTask, self).__init__([queue_entry])
2262 self.queue_entry_ids = [queue_entry.id]
2263
2264
2265 def prolog(self):
2266 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2267 super(HostlessQueueTask, self).prolog()
2268
2269
mbligh4608b002010-01-05 18:22:35 +00002270 def _finish_task(self):
2271 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002272 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002273
2274
showardd3dc1992009-04-22 21:01:40 +00002275class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002276 def __init__(self, queue_entries, log_file_name):
2277 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002278
showardd1195652009-12-08 22:21:02 +00002279 self.queue_entries = queue_entries
2280
showardd3dc1992009-04-22 21:01:40 +00002281 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002282 self._autoserv_monitor.attach_to_existing_process(
2283 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002284
showardd1195652009-12-08 22:21:02 +00002285
2286 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002287 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002288 return 'true'
2289 return self._generate_command(
2290 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002291
2292
2293 def _generate_command(self, results_dir):
2294 raise NotImplementedError('Subclasses must override this')
2295
2296
showardd1195652009-12-08 22:21:02 +00002297 @property
2298 def owner_username(self):
2299 return self.queue_entries[0].job.owner
2300
2301
2302 def _working_directory(self):
2303 return self._get_consistent_execution_path(self.queue_entries)
2304
2305
2306 def _paired_with_monitor(self):
2307 return self._autoserv_monitor
2308
2309
showardd3dc1992009-04-22 21:01:40 +00002310 def _job_was_aborted(self):
2311 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002312 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002313 queue_entry.update_from_database()
2314 if was_aborted is None: # first queue entry
2315 was_aborted = bool(queue_entry.aborted)
2316 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2317 email_manager.manager.enqueue_notify_email(
2318 'Inconsistent abort state',
2319 'Queue entries have inconsistent abort state: ' +
2320 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2321 # don't crash here, just assume true
2322 return True
2323 return was_aborted
2324
2325
showardd1195652009-12-08 22:21:02 +00002326 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002327 if self._job_was_aborted():
2328 return models.HostQueueEntry.Status.ABORTED
2329
2330 # we'll use a PidfileRunMonitor to read the autoserv exit status
2331 if self._autoserv_monitor.exit_code() == 0:
2332 return models.HostQueueEntry.Status.COMPLETED
2333 return models.HostQueueEntry.Status.FAILED
2334
2335
showardd3dc1992009-04-22 21:01:40 +00002336 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002337 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002338 queue_entry.set_status(status)
2339
2340
2341 def abort(self):
2342 # override AgentTask.abort() to avoid killing the process and ending
2343 # the task. post-job tasks continue when the job is aborted.
2344 pass
2345
2346
mbligh4608b002010-01-05 18:22:35 +00002347 def _pidfile_label(self):
2348 # '.autoserv_execute' -> 'autoserv'
2349 return self._pidfile_name()[1:-len('_execute')]
2350
2351
showard9bb960b2009-11-19 01:02:11 +00002352class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002353 """
2354 Task responsible for
2355 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2356 * copying logs to the results repository
2357 * spawning CleanupTasks for hosts, if necessary
2358 * spawning a FinalReparseTask for the job
2359 """
showardd1195652009-12-08 22:21:02 +00002360 def __init__(self, queue_entries, recover_run_monitor=None):
2361 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002362 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002363 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002364 self._set_ids(queue_entries=queue_entries)
2365
2366
2367 def _generate_command(self, results_dir):
2368 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002369 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002370 return [_autoserv_path , '-p',
2371 '--pidfile-label=%s' % self._pidfile_label(),
2372 '--use-existing-results', '--collect-crashinfo',
2373 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002374
2375
showardd1195652009-12-08 22:21:02 +00002376 @property
2377 def num_processes(self):
2378 return len(self.queue_entries)
2379
2380
2381 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002382 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002383
2384
showardd3dc1992009-04-22 21:01:40 +00002385 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002386 self._check_queue_entry_statuses(
2387 self.queue_entries,
2388 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2389 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002390
showardd3dc1992009-04-22 21:01:40 +00002391 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002392
2393
showardd3dc1992009-04-22 21:01:40 +00002394 def epilog(self):
2395 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002396 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002397 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002398
showard9bb960b2009-11-19 01:02:11 +00002399
2400 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002401 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002402 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002403 models.HostQueueEntry.Status.COMPLETED)
2404 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2405 else:
2406 final_success = False
2407 num_tests_failed = 0
2408
showard9bb960b2009-11-19 01:02:11 +00002409 reboot_after = self._job.reboot_after
2410 do_reboot = (
2411 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002412 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002413 or reboot_after == model_attributes.RebootAfter.ALWAYS
2414 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002415 and final_success and num_tests_failed == 0))
2416
showardd1195652009-12-08 22:21:02 +00002417 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002418 if do_reboot:
2419 # don't pass the queue entry to the CleanupTask. if the cleanup
2420 # fails, the job doesn't care -- it's over.
2421 models.SpecialTask.objects.create(
2422 host=models.Host.objects.get(id=queue_entry.host.id),
2423 task=models.SpecialTask.Task.CLEANUP,
2424 requested_by=self._job.owner_model())
2425 else:
2426 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002427
2428
showard0bbfc212009-04-29 21:06:13 +00002429 def run(self):
showard597bfd32009-05-08 18:22:50 +00002430 autoserv_exit_code = self._autoserv_monitor.exit_code()
2431 # only run if Autoserv exited due to some signal. if we have no exit
2432 # code, assume something bad (and signal-like) happened.
2433 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002434 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002435 else:
2436 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002437
2438
mbligh4608b002010-01-05 18:22:35 +00002439class SelfThrottledPostJobTask(PostJobTask):
2440 """
2441 Special AgentTask subclass that maintains its own global process limit.
2442 """
2443 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002444
2445
mbligh4608b002010-01-05 18:22:35 +00002446 @classmethod
2447 def _increment_running_processes(cls):
2448 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002449
mblighd5c95802008-03-05 00:33:46 +00002450
mbligh4608b002010-01-05 18:22:35 +00002451 @classmethod
2452 def _decrement_running_processes(cls):
2453 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002454
2455
mbligh4608b002010-01-05 18:22:35 +00002456 @classmethod
2457 def _max_processes(cls):
2458 raise NotImplementedError
2459
2460
2461 @classmethod
2462 def _can_run_new_process(cls):
2463 return cls._num_running_processes < cls._max_processes()
2464
2465
2466 def _process_started(self):
2467 return bool(self.monitor)
2468
2469
2470 def tick(self):
2471 # override tick to keep trying to start until the process count goes
2472 # down and we can, at which point we revert to default behavior
2473 if self._process_started():
2474 super(SelfThrottledPostJobTask, self).tick()
2475 else:
2476 self._try_starting_process()
2477
2478
2479 def run(self):
2480 # override run() to not actually run unless we can
2481 self._try_starting_process()
2482
2483
2484 def _try_starting_process(self):
2485 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002486 return
2487
mbligh4608b002010-01-05 18:22:35 +00002488 # actually run the command
2489 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002490 if self._process_started():
2491 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002492
mblighd5c95802008-03-05 00:33:46 +00002493
mbligh4608b002010-01-05 18:22:35 +00002494 def finished(self, success):
2495 super(SelfThrottledPostJobTask, self).finished(success)
2496 if self._process_started():
2497 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002498
showard21baa452008-10-21 00:08:39 +00002499
mbligh4608b002010-01-05 18:22:35 +00002500class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002501 def __init__(self, queue_entries):
2502 super(FinalReparseTask, self).__init__(queue_entries,
2503 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002504 # don't use _set_ids, since we don't want to set the host_ids
2505 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002506
2507
2508 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002509 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002510 results_dir]
2511
2512
2513 @property
2514 def num_processes(self):
2515 return 0 # don't include parser processes in accounting
2516
2517
2518 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002519 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002520
2521
showard97aed502008-11-04 02:01:24 +00002522 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002523 def _max_processes(cls):
2524 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002525
2526
2527 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002528 self._check_queue_entry_statuses(
2529 self.queue_entries,
2530 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002531
showard97aed502008-11-04 02:01:24 +00002532 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002533
2534
2535 def epilog(self):
2536 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002537 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002538
2539
mbligh4608b002010-01-05 18:22:35 +00002540class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002541 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2542
mbligh4608b002010-01-05 18:22:35 +00002543 def __init__(self, queue_entries):
2544 super(ArchiveResultsTask, self).__init__(queue_entries,
2545 log_file_name='.archiving.log')
2546 # don't use _set_ids, since we don't want to set the host_ids
2547 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002548
2549
mbligh4608b002010-01-05 18:22:35 +00002550 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002551 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002552
2553
mbligh4608b002010-01-05 18:22:35 +00002554 def _generate_command(self, results_dir):
2555 return [_autoserv_path , '-p',
2556 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002557 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002558 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2559 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002560
2561
mbligh4608b002010-01-05 18:22:35 +00002562 @classmethod
2563 def _max_processes(cls):
2564 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002565
2566
2567 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002568 self._check_queue_entry_statuses(
2569 self.queue_entries,
2570 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2571
2572 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002573
2574
mbligh4608b002010-01-05 18:22:35 +00002575 def epilog(self):
2576 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002577 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002578 failed_file = os.path.join(self._working_directory(),
2579 self._ARCHIVING_FAILED_FILE)
2580 paired_process = self._paired_with_monitor().get_process()
2581 _drone_manager.write_lines_to_file(
2582 failed_file, ['Archiving failed with exit code %s'
2583 % self.monitor.exit_code()],
2584 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002585 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002586
2587
mbligh36768f02008-02-22 18:28:33 +00002588if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002589 main()