blob: 86dd5358109370f0627d460ca5e3ec0e613edec1 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000025from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000027from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000028from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
55_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000056_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000057_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000058
59
showardec6a3b92009-09-25 20:29:13 +000060def _get_pidfile_timeout_secs():
61 """@returns How long to wait for autoserv to write pidfile."""
62 pidfile_timeout_mins = global_config.global_config.get_config_value(
63 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
64 return pidfile_timeout_mins * 60
65
66
mbligh83c1e9e2009-05-01 23:10:41 +000067def _site_init_monitor_db_dummy():
68 return {}
69
70
jamesrenc44ae992010-02-19 00:12:54 +000071get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000072 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000073 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000074
75
mbligh36768f02008-02-22 18:28:33 +000076def main():
showard27f33872009-04-07 18:20:53 +000077 try:
showard549afad2009-08-20 23:33:36 +000078 try:
79 main_without_exception_handling()
80 except SystemExit:
81 raise
82 except:
83 logging.exception('Exception escaping in monitor_db')
84 raise
85 finally:
86 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000087
88
89def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000090 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000091
showard136e6dc2009-06-10 19:38:49 +000092 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000096 parser.add_option('--test', help='Indicate that scheduler is under ' +
97 'test and should use dummy autoserv and no parsing',
98 action='store_true')
99 (options, args) = parser.parse_args()
100 if len(args) != 1:
101 parser.print_usage()
102 return
mbligh36768f02008-02-22 18:28:33 +0000103
showard5613c662009-06-08 23:30:33 +0000104 scheduler_enabled = global_config.global_config.get_config_value(
105 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
106
107 if not scheduler_enabled:
108 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
109 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000110 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000111 sys.exit(1)
112
jadmanski0afbb632008-06-06 21:10:57 +0000113 global RESULTS_DIR
114 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000115
mbligh83c1e9e2009-05-01 23:10:41 +0000116 site_init = utils.import_site_function(__file__,
117 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
118 _site_init_monitor_db_dummy)
119 site_init()
120
showardcca334f2009-03-12 20:38:34 +0000121 # Change the cwd while running to avoid issues incase we were launched from
122 # somewhere odd (such as a random NFS home directory of the person running
123 # sudo to launch us as the appropriate user).
124 os.chdir(RESULTS_DIR)
125
showardc85c21b2008-11-24 22:17:37 +0000126
jadmanski0afbb632008-06-06 21:10:57 +0000127 if options.test:
128 global _autoserv_path
129 _autoserv_path = 'autoserv_dummy'
130 global _testing_mode
131 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000132
jamesrenc44ae992010-02-19 00:12:54 +0000133 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000134 server.start()
135
jadmanski0afbb632008-06-06 21:10:57 +0000136 try:
jamesrenc44ae992010-02-19 00:12:54 +0000137 initialize()
showardc5afc462009-01-13 00:09:39 +0000138 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000139 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000140
jadmanski0afbb632008-06-06 21:10:57 +0000141 while not _shutdown:
142 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000143 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000144 except:
showard170873e2009-01-07 00:22:26 +0000145 email_manager.manager.log_stacktrace(
146 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000147
showard170873e2009-01-07 00:22:26 +0000148 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000149 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000150 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000151 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000152
153
showard136e6dc2009-06-10 19:38:49 +0000154def setup_logging():
155 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
156 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
157 logging_manager.configure_logging(
158 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
159 logfile_name=log_name)
160
161
mbligh36768f02008-02-22 18:28:33 +0000162def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000163 global _shutdown
164 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000165 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000166
167
jamesrenc44ae992010-02-19 00:12:54 +0000168def initialize():
showardb18134f2009-03-20 20:52:18 +0000169 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
170 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000171
showard8de37132009-08-31 18:33:08 +0000172 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000173 logging.critical("monitor_db already running, aborting!")
174 sys.exit(1)
175 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000184 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000188 # bypass the readonly connection
189 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000190
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000192 signal.signal(signal.SIGINT, handle_sigint)
193
jamesrenc44ae992010-02-19 00:12:54 +0000194 initialize_globals()
195 scheduler_models.initialize()
196
showardd1ee1dd2009-01-07 21:33:08 +0000197 drones = global_config.global_config.get_config_value(
198 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
199 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000200 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000201 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000202 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
203
showardb18134f2009-03-20 20:52:18 +0000204 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000205
206
jamesrenc44ae992010-02-19 00:12:54 +0000207def initialize_globals():
208 global _drone_manager
209 _drone_manager = drone_manager.instance()
210
211
showarded2afea2009-07-07 20:54:07 +0000212def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
213 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000214 """
215 @returns The autoserv command line as a list of executable + parameters.
216
217 @param machines - string - A machine or comma separated list of machines
218 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000219 @param extra_args - list - Additional arguments to pass to autoserv.
220 @param job - Job object - If supplied, -u owner and -l name parameters
221 will be added.
222 @param queue_entry - A HostQueueEntry object - If supplied and no Job
223 object was supplied, this will be used to lookup the Job object.
224 """
showarda9545c02009-12-18 22:44:26 +0000225 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000226 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000227 if machines:
228 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
jamesren883492a2010-02-12 00:45:18 +0000242class HostScheduler(metahost_scheduler.HostSchedulingUtility):
243 """Handles the logic for choosing when to run jobs and on which hosts.
244
245 This class makes several queries to the database on each tick, building up
246 some auxiliary data structures and using them to determine which hosts are
247 eligible to run which jobs, taking into account all the various factors that
248 affect that.
249
250 In the past this was done with one or two very large, complex database
251 queries. It has proven much simpler and faster to build these auxiliary
252 data structures and perform the logic in Python.
253 """
254 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000255 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
256
257 # load site-specific scheduler selected in global_config
258 site_schedulers_str = global_config.global_config.get_config_value(
259 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
260 default='')
261 site_schedulers = set(site_schedulers_str.split(','))
262 for scheduler in get_site_metahost_schedulers():
263 if type(scheduler).__name__ in site_schedulers:
264 # always prepend, so site schedulers take precedence
265 self._metahost_schedulers = (
266 [scheduler] + self._metahost_schedulers)
267 logging.info('Metahost schedulers: %s',
268 ', '.join(type(scheduler).__name__ for scheduler
269 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000270
271
showard63a34772008-08-18 19:32:50 +0000272 def _get_ready_hosts(self):
273 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000274 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000275 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
276 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000277 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000278 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000279 "AND NOT afe_hosts.locked "
280 "AND (afe_hosts.status IS NULL "
281 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000282 return dict((host.id, host) for host in hosts)
283
284
285 @staticmethod
286 def _get_sql_id_list(id_list):
287 return ','.join(str(item_id) for item_id in id_list)
288
289
290 @classmethod
showard989f25d2008-10-01 11:38:11 +0000291 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000292 if not id_list:
293 return {}
showard63a34772008-08-18 19:32:50 +0000294 query %= cls._get_sql_id_list(id_list)
295 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000296 return cls._process_many2many_dict(rows, flip)
297
298
299 @staticmethod
300 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000301 result = {}
302 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000303 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000304 if flip:
305 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000306 result.setdefault(left_id, set()).add(right_id)
307 return result
308
309
310 @classmethod
311 def _get_job_acl_groups(cls, job_ids):
312 query = """
showardeab66ce2009-12-23 00:03:56 +0000313 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
314 FROM afe_jobs
315 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
316 INNER JOIN afe_acl_groups_users ON
317 afe_acl_groups_users.user_id = afe_users.id
318 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000319 """
320 return cls._get_many2many_dict(query, job_ids)
321
322
323 @classmethod
324 def _get_job_ineligible_hosts(cls, job_ids):
325 query = """
326 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000327 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000328 WHERE job_id IN (%s)
329 """
330 return cls._get_many2many_dict(query, job_ids)
331
332
333 @classmethod
showard989f25d2008-10-01 11:38:11 +0000334 def _get_job_dependencies(cls, job_ids):
335 query = """
336 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000337 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000338 WHERE job_id IN (%s)
339 """
340 return cls._get_many2many_dict(query, job_ids)
341
342
343 @classmethod
showard63a34772008-08-18 19:32:50 +0000344 def _get_host_acls(cls, host_ids):
345 query = """
showardd9ac4452009-02-07 02:04:37 +0000346 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000347 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000348 WHERE host_id IN (%s)
349 """
350 return cls._get_many2many_dict(query, host_ids)
351
352
353 @classmethod
354 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000355 if not host_ids:
356 return {}, {}
showard63a34772008-08-18 19:32:50 +0000357 query = """
358 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000359 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000360 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000361 """ % cls._get_sql_id_list(host_ids)
362 rows = _db.execute(query)
363 labels_to_hosts = cls._process_many2many_dict(rows)
364 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
365 return labels_to_hosts, hosts_to_labels
366
367
368 @classmethod
369 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000370 return dict((label.id, label) for label
371 in scheduler_models.Label.fetch())
372
373
374 def recovery_on_startup(self):
375 for metahost_scheduler in self._metahost_schedulers:
376 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000377
378
379 def refresh(self, pending_queue_entries):
380 self._hosts_available = self._get_ready_hosts()
381
382 relevant_jobs = [queue_entry.job_id
383 for queue_entry in pending_queue_entries]
384 self._job_acls = self._get_job_acl_groups(relevant_jobs)
385 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000386 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000387
388 host_ids = self._hosts_available.keys()
389 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000390 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
391
392 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000393
jamesrene21bf412010-02-26 02:30:07 +0000394
395 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000396 for metahost_scheduler in self._metahost_schedulers:
397 metahost_scheduler.tick()
398
showard63a34772008-08-18 19:32:50 +0000399
jamesren883492a2010-02-12 00:45:18 +0000400 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000401 return set(self._label_hosts.get(label_id, ()))
402
403
404 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000405 self._label_hosts[label_id].remove(host_id)
406
407
408 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000409 return self._hosts_available.pop(host_id)
410
411
412 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000413 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
414
415
showard63a34772008-08-18 19:32:50 +0000416 def _is_acl_accessible(self, host_id, queue_entry):
417 job_acls = self._job_acls.get(queue_entry.job_id, set())
418 host_acls = self._host_acls.get(host_id, set())
419 return len(host_acls.intersection(job_acls)) > 0
420
421
showard989f25d2008-10-01 11:38:11 +0000422 def _check_job_dependencies(self, job_dependencies, host_labels):
423 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000424 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000425
426
427 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
428 queue_entry):
showardade14e22009-01-26 22:38:32 +0000429 if not queue_entry.meta_host:
430 # bypass only_if_needed labels when a specific host is selected
431 return True
432
showard989f25d2008-10-01 11:38:11 +0000433 for label_id in host_labels:
434 label = self._labels[label_id]
435 if not label.only_if_needed:
436 # we don't care about non-only_if_needed labels
437 continue
438 if queue_entry.meta_host == label_id:
439 # if the label was requested in a metahost it's OK
440 continue
441 if label_id not in job_dependencies:
442 return False
443 return True
444
445
showard89f84db2009-03-12 20:39:13 +0000446 def _check_atomic_group_labels(self, host_labels, queue_entry):
447 """
448 Determine if the given HostQueueEntry's atomic group settings are okay
449 to schedule on a host with the given labels.
450
showard6157c632009-07-06 20:19:31 +0000451 @param host_labels: A list of label ids that the host has.
452 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000453
454 @returns True if atomic group settings are okay, False otherwise.
455 """
showard6157c632009-07-06 20:19:31 +0000456 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000457 queue_entry.atomic_group_id)
458
459
showard6157c632009-07-06 20:19:31 +0000460 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000461 """
462 Return the atomic group label id for a host with the given set of
463 labels if any, or None otherwise. Raises an exception if more than
464 one atomic group are found in the set of labels.
465
showard6157c632009-07-06 20:19:31 +0000466 @param host_labels: A list of label ids that the host has.
467 @param queue_entry: The HostQueueEntry we're testing. Only used for
468 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000469
470 @returns The id of the atomic group found on a label in host_labels
471 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000472 """
showard6157c632009-07-06 20:19:31 +0000473 atomic_labels = [self._labels[label_id] for label_id in host_labels
474 if self._labels[label_id].atomic_group_id is not None]
475 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000476 if not atomic_ids:
477 return None
478 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000479 logging.error('More than one Atomic Group on HQE "%s" via: %r',
480 queue_entry, atomic_labels)
481 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000482
483
484 def _get_atomic_group_labels(self, atomic_group_id):
485 """
486 Lookup the label ids that an atomic_group is associated with.
487
488 @param atomic_group_id - The id of the AtomicGroup to look up.
489
490 @returns A generator yeilding Label ids for this atomic group.
491 """
492 return (id for id, label in self._labels.iteritems()
493 if label.atomic_group_id == atomic_group_id
494 and not label.invalid)
495
496
showard54c1ea92009-05-20 00:32:58 +0000497 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000498 """
499 @param group_hosts - A sequence of Host ids to test for usability
500 and eligibility against the Job associated with queue_entry.
501 @param queue_entry - The HostQueueEntry that these hosts are being
502 tested for eligibility against.
503
504 @returns A subset of group_hosts Host ids that are eligible for the
505 supplied queue_entry.
506 """
507 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000508 if self.is_host_usable(host_id)
509 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000510
511
jamesren883492a2010-02-12 00:45:18 +0000512 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000513 if self._is_host_invalid(host_id):
514 # if an invalid host is scheduled for a job, it's a one-time host
515 # and it therefore bypasses eligibility checks. note this can only
516 # happen for non-metahosts, because invalid hosts have their label
517 # relationships cleared.
518 return True
519
showard989f25d2008-10-01 11:38:11 +0000520 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
521 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000522
showard89f84db2009-03-12 20:39:13 +0000523 return (self._is_acl_accessible(host_id, queue_entry) and
524 self._check_job_dependencies(job_dependencies, host_labels) and
525 self._check_only_if_needed_labels(
526 job_dependencies, host_labels, queue_entry) and
527 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000528
529
showard2924b0a2009-06-18 23:16:15 +0000530 def _is_host_invalid(self, host_id):
531 host_object = self._hosts_available.get(host_id, None)
532 return host_object and host_object.invalid
533
534
showard63a34772008-08-18 19:32:50 +0000535 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000536 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000537 return None
538 return self._hosts_available.pop(queue_entry.host_id, None)
539
540
jamesren883492a2010-02-12 00:45:18 +0000541 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000542 if host_id not in self._hosts_available:
543 # host was already used during this scheduling cycle
544 return False
545 if self._hosts_available[host_id].invalid:
546 # Invalid hosts cannot be used for metahosts. They're included in
547 # the original query because they can be used by non-metahosts.
548 return False
549 return True
550
551
jamesren883492a2010-02-12 00:45:18 +0000552 def schedule_entry(self, queue_entry):
553 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000554 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000555
556 for scheduler in self._metahost_schedulers:
557 if scheduler.can_schedule_metahost(queue_entry):
558 scheduler.schedule_metahost(queue_entry, self)
559 return None
560
561 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000562
563
showard89f84db2009-03-12 20:39:13 +0000564 def find_eligible_atomic_group(self, queue_entry):
565 """
566 Given an atomic group host queue entry, locate an appropriate group
567 of hosts for the associated job to run on.
568
569 The caller is responsible for creating new HQEs for the additional
570 hosts returned in order to run the actual job on them.
571
572 @returns A list of Host instances in a ready state to satisfy this
573 atomic group scheduling. Hosts will all belong to the same
574 atomic group label as specified by the queue_entry.
575 An empty list will be returned if no suitable atomic
576 group could be found.
577
578 TODO(gps): what is responsible for kicking off any attempted repairs on
579 a group of hosts? not this function, but something needs to. We do
580 not communicate that reason for returning [] outside of here...
581 For now, we'll just be unschedulable if enough hosts within one group
582 enter Repair Failed state.
583 """
584 assert queue_entry.atomic_group_id is not None
585 job = queue_entry.job
586 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000587 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000588 if job.synch_count > atomic_group.max_number_of_machines:
589 # Such a Job and HostQueueEntry should never be possible to
590 # create using the frontend. Regardless, we can't process it.
591 # Abort it immediately and log an error on the scheduler.
592 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000593 logging.error(
594 'Error: job %d synch_count=%d > requested atomic_group %d '
595 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
596 job.id, job.synch_count, atomic_group.id,
597 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000598 return []
jamesren883492a2010-02-12 00:45:18 +0000599 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
600 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000601
602 # Look in each label associated with atomic_group until we find one with
603 # enough hosts to satisfy the job.
604 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000605 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000606 if queue_entry.meta_host is not None:
607 # If we have a metahost label, only allow its hosts.
608 group_hosts.intersection_update(hosts_in_label)
609 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000610 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000611 group_hosts, queue_entry)
612
613 # Job.synch_count is treated as "minimum synch count" when
614 # scheduling for an atomic group of hosts. The atomic group
615 # number of machines is the maximum to pick out of a single
616 # atomic group label for scheduling at one time.
617 min_hosts = job.synch_count
618 max_hosts = atomic_group.max_number_of_machines
619
showard54c1ea92009-05-20 00:32:58 +0000620 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000621 # Not enough eligible hosts in this atomic group label.
622 continue
623
showard54c1ea92009-05-20 00:32:58 +0000624 eligible_hosts_in_group = [self._hosts_available[id]
625 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000626 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000627 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000628
showard89f84db2009-03-12 20:39:13 +0000629 # Limit ourselves to scheduling the atomic group size.
630 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000631 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000632
633 # Remove the selected hosts from our cached internal state
634 # of available hosts in order to return the Host objects.
635 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000636 for host in eligible_hosts_in_group:
637 hosts_in_label.discard(host.id)
638 self._hosts_available.pop(host.id)
639 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000640 return host_list
641
642 return []
643
644
showard170873e2009-01-07 00:22:26 +0000645class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000646 def __init__(self):
647 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000648 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000649 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000650 user_cleanup_time = scheduler_config.config.clean_interval
651 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
652 _db, user_cleanup_time)
653 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000654 self._host_agents = {}
655 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000656 self._tick_count = 0
657 self._last_garbage_stats_time = time.time()
658 self._seconds_between_garbage_stats = 60 * (
659 global_config.global_config.get_config_value(
660 scheduler_config.CONFIG_SECTION,
661 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000662
mbligh36768f02008-02-22 18:28:33 +0000663
showard915958d2009-04-22 21:00:58 +0000664 def initialize(self, recover_hosts=True):
665 self._periodic_cleanup.initialize()
666 self._24hr_upkeep.initialize()
667
jadmanski0afbb632008-06-06 21:10:57 +0000668 # always recover processes
669 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000670
jadmanski0afbb632008-06-06 21:10:57 +0000671 if recover_hosts:
672 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000673
jamesrenc44ae992010-02-19 00:12:54 +0000674 self._host_scheduler.recovery_on_startup()
675
mbligh36768f02008-02-22 18:28:33 +0000676
jadmanski0afbb632008-06-06 21:10:57 +0000677 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000678 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000679 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000680 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000681 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000682 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000683 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000684 self._schedule_running_host_queue_entries()
685 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000686 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000687 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000688 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000689 _drone_manager.execute_actions()
690 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000691 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000692 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000693
showard97aed502008-11-04 02:01:24 +0000694
mblighf3294cc2009-04-08 21:17:38 +0000695 def _run_cleanup(self):
696 self._periodic_cleanup.run_cleanup_maybe()
697 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000698
mbligh36768f02008-02-22 18:28:33 +0000699
showardf13a9e22009-12-18 22:54:09 +0000700 def _garbage_collection(self):
701 threshold_time = time.time() - self._seconds_between_garbage_stats
702 if threshold_time < self._last_garbage_stats_time:
703 # Don't generate these reports very often.
704 return
705
706 self._last_garbage_stats_time = time.time()
707 # Force a full level 0 collection (because we can, it doesn't hurt
708 # at this interval).
709 gc.collect()
710 logging.info('Logging garbage collector stats on tick %d.',
711 self._tick_count)
712 gc_stats._log_garbage_collector_stats()
713
714
showard170873e2009-01-07 00:22:26 +0000715 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
716 for object_id in object_ids:
717 agent_dict.setdefault(object_id, set()).add(agent)
718
719
720 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
721 for object_id in object_ids:
722 assert object_id in agent_dict
723 agent_dict[object_id].remove(agent)
724
725
showardd1195652009-12-08 22:21:02 +0000726 def add_agent_task(self, agent_task):
727 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000728 self._agents.append(agent)
729 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000730 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
731 self._register_agent_for_ids(self._queue_entry_agents,
732 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000733
showard170873e2009-01-07 00:22:26 +0000734
735 def get_agents_for_entry(self, queue_entry):
736 """
737 Find agents corresponding to the specified queue_entry.
738 """
showardd3dc1992009-04-22 21:01:40 +0000739 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000740
741
742 def host_has_agent(self, host):
743 """
744 Determine if there is currently an Agent present using this host.
745 """
746 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000747
748
jadmanski0afbb632008-06-06 21:10:57 +0000749 def remove_agent(self, agent):
750 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000751 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
752 agent)
753 self._unregister_agent_for_ids(self._queue_entry_agents,
754 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000755
756
showard8cc058f2009-09-08 16:26:33 +0000757 def _host_has_scheduled_special_task(self, host):
758 return bool(models.SpecialTask.objects.filter(host__id=host.id,
759 is_active=False,
760 is_complete=False))
761
762
jadmanski0afbb632008-06-06 21:10:57 +0000763 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000764 agent_tasks = self._create_recovery_agent_tasks()
765 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000766 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000767 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000768 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000769 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000770 self._reverify_remaining_hosts()
771 # reinitialize drones after killing orphaned processes, since they can
772 # leave around files when they die
773 _drone_manager.execute_actions()
774 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000775
showard170873e2009-01-07 00:22:26 +0000776
showardd1195652009-12-08 22:21:02 +0000777 def _create_recovery_agent_tasks(self):
778 return (self._get_queue_entry_agent_tasks()
779 + self._get_special_task_agent_tasks(is_active=True))
780
781
782 def _get_queue_entry_agent_tasks(self):
783 # host queue entry statuses handled directly by AgentTasks (Verifying is
784 # handled through SpecialTasks, so is not listed here)
785 statuses = (models.HostQueueEntry.Status.STARTING,
786 models.HostQueueEntry.Status.RUNNING,
787 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000788 models.HostQueueEntry.Status.PARSING,
789 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000790 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000791 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000792 where='status IN (%s)' % status_list)
793
794 agent_tasks = []
795 used_queue_entries = set()
796 for entry in queue_entries:
797 if self.get_agents_for_entry(entry):
798 # already being handled
799 continue
800 if entry in used_queue_entries:
801 # already picked up by a synchronous job
802 continue
803 agent_task = self._get_agent_task_for_queue_entry(entry)
804 agent_tasks.append(agent_task)
805 used_queue_entries.update(agent_task.queue_entries)
806 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000807
808
showardd1195652009-12-08 22:21:02 +0000809 def _get_special_task_agent_tasks(self, is_active=False):
810 special_tasks = models.SpecialTask.objects.filter(
811 is_active=is_active, is_complete=False)
812 return [self._get_agent_task_for_special_task(task)
813 for task in special_tasks]
814
815
816 def _get_agent_task_for_queue_entry(self, queue_entry):
817 """
818 Construct an AgentTask instance for the given active HostQueueEntry,
819 if one can currently run it.
820 @param queue_entry: a HostQueueEntry
821 @returns an AgentTask to run the queue entry
822 """
823 task_entries = queue_entry.job.get_group_entries(queue_entry)
824 self._check_for_duplicate_host_entries(task_entries)
825
826 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
827 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000828 if queue_entry.is_hostless():
829 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000830 return QueueTask(queue_entries=task_entries)
831 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
832 return GatherLogsTask(queue_entries=task_entries)
833 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
834 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000835 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
836 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000837
838 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000839 'invalid status %s: %s'
840 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000841
842
843 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000844 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
845 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000846 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000847 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000848 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000849 if using_host:
showardd1195652009-12-08 22:21:02 +0000850 self._assert_host_has_no_agent(task_entry)
851
852
853 def _assert_host_has_no_agent(self, entry):
854 """
855 @param entry: a HostQueueEntry or a SpecialTask
856 """
857 if self.host_has_agent(entry.host):
858 agent = tuple(self._host_agents.get(entry.host.id))[0]
859 raise SchedulerError(
860 'While scheduling %s, host %s already has a host agent %s'
861 % (entry, entry.host, agent.task))
862
863
864 def _get_agent_task_for_special_task(self, special_task):
865 """
866 Construct an AgentTask class to run the given SpecialTask and add it
867 to this dispatcher.
868 @param special_task: a models.SpecialTask instance
869 @returns an AgentTask to run this SpecialTask
870 """
871 self._assert_host_has_no_agent(special_task)
872
873 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
874 for agent_task_class in special_agent_task_classes:
875 if agent_task_class.TASK_TYPE == special_task.task:
876 return agent_task_class(task=special_task)
877
878 raise SchedulerError('No AgentTask class for task', str(special_task))
879
880
881 def _register_pidfiles(self, agent_tasks):
882 for agent_task in agent_tasks:
883 agent_task.register_necessary_pidfiles()
884
885
886 def _recover_tasks(self, agent_tasks):
887 orphans = _drone_manager.get_orphaned_autoserv_processes()
888
889 for agent_task in agent_tasks:
890 agent_task.recover()
891 if agent_task.monitor and agent_task.monitor.has_process():
892 orphans.discard(agent_task.monitor.get_process())
893 self.add_agent_task(agent_task)
894
895 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000896
897
showard8cc058f2009-09-08 16:26:33 +0000898 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000899 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
900 % status):
showard0db3d432009-10-12 20:29:15 +0000901 if entry.status == status and not self.get_agents_for_entry(entry):
902 # The status can change during iteration, e.g., if job.run()
903 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000904 yield entry
905
906
showard6878e8b2009-07-20 22:37:45 +0000907 def _check_for_remaining_orphan_processes(self, orphans):
908 if not orphans:
909 return
910 subject = 'Unrecovered orphan autoserv processes remain'
911 message = '\n'.join(str(process) for process in orphans)
912 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000913
914 die_on_orphans = global_config.global_config.get_config_value(
915 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
916
917 if die_on_orphans:
918 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000919
showard170873e2009-01-07 00:22:26 +0000920
showard8cc058f2009-09-08 16:26:33 +0000921 def _recover_pending_entries(self):
922 for entry in self._get_unassigned_entries(
923 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000924 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000925 entry.on_pending()
926
927
showardb8900452009-10-12 20:31:01 +0000928 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000929 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000930 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
931 unrecovered_hqes = []
932 for queue_entry in queue_entries:
933 special_tasks = models.SpecialTask.objects.filter(
934 task__in=(models.SpecialTask.Task.CLEANUP,
935 models.SpecialTask.Task.VERIFY),
936 queue_entry__id=queue_entry.id,
937 is_complete=False)
938 if special_tasks.count() == 0:
939 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000940
showardb8900452009-10-12 20:31:01 +0000941 if unrecovered_hqes:
942 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000943 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000944 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000945 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000946
947
showard65db3932009-10-28 19:54:35 +0000948 def _get_prioritized_special_tasks(self):
949 """
950 Returns all queued SpecialTasks prioritized for repair first, then
951 cleanup, then verify.
952 """
953 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
954 is_complete=False,
955 host__locked=False)
956 # exclude hosts with active queue entries unless the SpecialTask is for
957 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000958 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000959 queued_tasks, 'afe_host_queue_entries', 'host_id',
960 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000961 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000962 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000963 where=['(afe_host_queue_entries.id IS NULL OR '
964 'afe_host_queue_entries.id = '
965 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000966
showard65db3932009-10-28 19:54:35 +0000967 # reorder tasks by priority
968 task_priority_order = [models.SpecialTask.Task.REPAIR,
969 models.SpecialTask.Task.CLEANUP,
970 models.SpecialTask.Task.VERIFY]
971 def task_priority_key(task):
972 return task_priority_order.index(task.task)
973 return sorted(queued_tasks, key=task_priority_key)
974
975
showard65db3932009-10-28 19:54:35 +0000976 def _schedule_special_tasks(self):
977 """
978 Execute queued SpecialTasks that are ready to run on idle hosts.
979 """
980 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000981 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000982 continue
showardd1195652009-12-08 22:21:02 +0000983 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000984
985
showard170873e2009-01-07 00:22:26 +0000986 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000987 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000988 # should never happen
showarded2afea2009-07-07 20:54:07 +0000989 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000990 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000991 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000992 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000993 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000994
995
jadmanski0afbb632008-06-06 21:10:57 +0000996 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000997 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000998 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000999 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001000 if self.host_has_agent(host):
1001 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001002 continue
showard8cc058f2009-09-08 16:26:33 +00001003 if self._host_has_scheduled_special_task(host):
1004 # host will have a special task scheduled on the next cycle
1005 continue
showard170873e2009-01-07 00:22:26 +00001006 if print_message:
showardb18134f2009-03-20 20:52:18 +00001007 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001008 models.SpecialTask.objects.create(
1009 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001010 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001011
1012
jadmanski0afbb632008-06-06 21:10:57 +00001013 def _recover_hosts(self):
1014 # recover "Repair Failed" hosts
1015 message = 'Reverifying dead host %s'
1016 self._reverify_hosts_where("status = 'Repair Failed'",
1017 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001018
1019
showard04c82c52008-05-29 19:38:12 +00001020
showardb95b1bd2008-08-15 18:11:04 +00001021 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001022 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001023 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001024 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001025 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001026 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001027
1028
showard89f84db2009-03-12 20:39:13 +00001029 def _refresh_pending_queue_entries(self):
1030 """
1031 Lookup the pending HostQueueEntries and call our HostScheduler
1032 refresh() method given that list. Return the list.
1033
1034 @returns A list of pending HostQueueEntries sorted in priority order.
1035 """
showard63a34772008-08-18 19:32:50 +00001036 queue_entries = self._get_pending_queue_entries()
1037 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001038 return []
showardb95b1bd2008-08-15 18:11:04 +00001039
showard63a34772008-08-18 19:32:50 +00001040 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001041
showard89f84db2009-03-12 20:39:13 +00001042 return queue_entries
1043
1044
1045 def _schedule_atomic_group(self, queue_entry):
1046 """
1047 Schedule the given queue_entry on an atomic group of hosts.
1048
1049 Returns immediately if there are insufficient available hosts.
1050
1051 Creates new HostQueueEntries based off of queue_entry for the
1052 scheduled hosts and starts them all running.
1053 """
1054 # This is a virtual host queue entry representing an entire
1055 # atomic group, find a group and schedule their hosts.
1056 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1057 queue_entry)
1058 if not group_hosts:
1059 return
showardcbe6f942009-06-17 19:33:49 +00001060
1061 logging.info('Expanding atomic group entry %s with hosts %s',
1062 queue_entry,
1063 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001064
showard89f84db2009-03-12 20:39:13 +00001065 for assigned_host in group_hosts[1:]:
1066 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001067 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001068 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001069 new_hqe.set_host(assigned_host)
1070 self._run_queue_entry(new_hqe)
1071
1072 # The first assigned host uses the original HostQueueEntry
1073 queue_entry.set_host(group_hosts[0])
1074 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001075
1076
showarda9545c02009-12-18 22:44:26 +00001077 def _schedule_hostless_job(self, queue_entry):
1078 self.add_agent_task(HostlessQueueTask(queue_entry))
1079
1080
showard89f84db2009-03-12 20:39:13 +00001081 def _schedule_new_jobs(self):
1082 queue_entries = self._refresh_pending_queue_entries()
1083 if not queue_entries:
1084 return
1085
showard63a34772008-08-18 19:32:50 +00001086 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001087 is_unassigned_atomic_group = (
1088 queue_entry.atomic_group_id is not None
1089 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001090
1091 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001092 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001093 elif is_unassigned_atomic_group:
1094 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001095 else:
jamesren883492a2010-02-12 00:45:18 +00001096 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001097 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001098 assert assigned_host.id == queue_entry.host_id
1099 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001100
1101
showard8cc058f2009-09-08 16:26:33 +00001102 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001103 for agent_task in self._get_queue_entry_agent_tasks():
1104 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001105
1106
1107 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001108 for entry in scheduler_models.HostQueueEntry.fetch(
1109 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001110 task = entry.job.schedule_delayed_callback_task(entry)
1111 if task:
showardd1195652009-12-08 22:21:02 +00001112 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001113
1114
jamesren883492a2010-02-12 00:45:18 +00001115 def _run_queue_entry(self, queue_entry):
1116 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001117
1118
jadmanski0afbb632008-06-06 21:10:57 +00001119 def _find_aborting(self):
jamesrenc44ae992010-02-19 00:12:54 +00001120 for entry in scheduler_models.HostQueueEntry.fetch(
1121 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001122 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001123 for agent in self.get_agents_for_entry(entry):
1124 agent.abort()
1125 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001126
1127
showard324bf812009-01-20 23:23:38 +00001128 def _can_start_agent(self, agent, num_started_this_cycle,
1129 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001130 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001131 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001132 return True
1133 # don't allow any nonzero-process agents to run after we've reached a
1134 # limit (this avoids starvation of many-process agents)
1135 if have_reached_limit:
1136 return False
1137 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001138 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001139 agent.task.owner_username)
1140 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001141 return False
1142 # if a single agent exceeds the per-cycle throttling, still allow it to
1143 # run when it's the first agent in the cycle
1144 if num_started_this_cycle == 0:
1145 return True
1146 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001147 if (num_started_this_cycle + agent.task.num_processes >
1148 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001149 return False
1150 return True
1151
1152
jadmanski0afbb632008-06-06 21:10:57 +00001153 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001154 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001155 have_reached_limit = False
1156 # iterate over copy, so we can remove agents during iteration
1157 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001158 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001159 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001160 have_reached_limit):
1161 have_reached_limit = True
1162 continue
showardd1195652009-12-08 22:21:02 +00001163 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001164 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001165 if agent.is_done():
1166 logging.info("agent finished")
1167 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001168 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001169 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001170
1171
showard29f7cd22009-04-29 21:16:24 +00001172 def _process_recurring_runs(self):
1173 recurring_runs = models.RecurringRun.objects.filter(
1174 start_date__lte=datetime.datetime.now())
1175 for rrun in recurring_runs:
1176 # Create job from template
1177 job = rrun.job
1178 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001179 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001180
1181 host_objects = info['hosts']
1182 one_time_hosts = info['one_time_hosts']
1183 metahost_objects = info['meta_hosts']
1184 dependencies = info['dependencies']
1185 atomic_group = info['atomic_group']
1186
1187 for host in one_time_hosts or []:
1188 this_host = models.Host.create_one_time_host(host.hostname)
1189 host_objects.append(this_host)
1190
1191 try:
1192 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001193 options=options,
showard29f7cd22009-04-29 21:16:24 +00001194 host_objects=host_objects,
1195 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001196 atomic_group=atomic_group)
1197
1198 except Exception, ex:
1199 logging.exception(ex)
1200 #TODO send email
1201
1202 if rrun.loop_count == 1:
1203 rrun.delete()
1204 else:
1205 if rrun.loop_count != 0: # if not infinite loop
1206 # calculate new start_date
1207 difference = datetime.timedelta(seconds=rrun.loop_period)
1208 rrun.start_date = rrun.start_date + difference
1209 rrun.loop_count -= 1
1210 rrun.save()
1211
1212
showard170873e2009-01-07 00:22:26 +00001213class PidfileRunMonitor(object):
1214 """
1215 Client must call either run() to start a new process or
1216 attach_to_existing_process().
1217 """
mbligh36768f02008-02-22 18:28:33 +00001218
showard170873e2009-01-07 00:22:26 +00001219 class _PidfileException(Exception):
1220 """
1221 Raised when there's some unexpected behavior with the pid file, but only
1222 used internally (never allowed to escape this class).
1223 """
mbligh36768f02008-02-22 18:28:33 +00001224
1225
showard170873e2009-01-07 00:22:26 +00001226 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001227 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001228 self._start_time = None
1229 self.pidfile_id = None
1230 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001231
1232
showard170873e2009-01-07 00:22:26 +00001233 def _add_nice_command(self, command, nice_level):
1234 if not nice_level:
1235 return command
1236 return ['nice', '-n', str(nice_level)] + command
1237
1238
1239 def _set_start_time(self):
1240 self._start_time = time.time()
1241
1242
showard418785b2009-11-23 20:19:59 +00001243 def run(self, command, working_directory, num_processes, nice_level=None,
1244 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1245 username=None):
showard170873e2009-01-07 00:22:26 +00001246 assert command is not None
1247 if nice_level is not None:
1248 command = ['nice', '-n', str(nice_level)] + command
1249 self._set_start_time()
1250 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001251 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001252 num_processes=num_processes, log_file=log_file,
1253 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001254
1255
showarded2afea2009-07-07 20:54:07 +00001256 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001257 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001258 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001259 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001260 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001261 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001262 if num_processes is not None:
1263 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def kill(self):
showard170873e2009-01-07 00:22:26 +00001267 if self.has_process():
1268 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001269
mbligh36768f02008-02-22 18:28:33 +00001270
showard170873e2009-01-07 00:22:26 +00001271 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001272 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001273 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001274
1275
showard170873e2009-01-07 00:22:26 +00001276 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001277 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001278 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001279 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001280
1281
showard170873e2009-01-07 00:22:26 +00001282 def _read_pidfile(self, use_second_read=False):
1283 assert self.pidfile_id is not None, (
1284 'You must call run() or attach_to_existing_process()')
1285 contents = _drone_manager.get_pidfile_contents(
1286 self.pidfile_id, use_second_read=use_second_read)
1287 if contents.is_invalid():
1288 self._state = drone_manager.PidfileContents()
1289 raise self._PidfileException(contents)
1290 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001291
1292
showard21baa452008-10-21 00:08:39 +00001293 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001294 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1295 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001296 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001297 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001298
1299
1300 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001301 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001302 return
mblighbb421852008-03-11 22:36:16 +00001303
showard21baa452008-10-21 00:08:39 +00001304 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001305
showard170873e2009-01-07 00:22:26 +00001306 if self._state.process is None:
1307 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001308 return
mbligh90a549d2008-03-25 23:52:34 +00001309
showard21baa452008-10-21 00:08:39 +00001310 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001311 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001312 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001313 return
mbligh90a549d2008-03-25 23:52:34 +00001314
showard170873e2009-01-07 00:22:26 +00001315 # pid but no running process - maybe process *just* exited
1316 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001317 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001318 # autoserv exited without writing an exit code
1319 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001320 self._handle_pidfile_error(
1321 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001322
showard21baa452008-10-21 00:08:39 +00001323
1324 def _get_pidfile_info(self):
1325 """\
1326 After completion, self._state will contain:
1327 pid=None, exit_status=None if autoserv has not yet run
1328 pid!=None, exit_status=None if autoserv is running
1329 pid!=None, exit_status!=None if autoserv has completed
1330 """
1331 try:
1332 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001333 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001334 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001335
1336
showard170873e2009-01-07 00:22:26 +00001337 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001338 """\
1339 Called when no pidfile is found or no pid is in the pidfile.
1340 """
showard170873e2009-01-07 00:22:26 +00001341 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001342 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001343 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001344 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001345 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001346
1347
showard35162b02009-03-03 02:17:30 +00001348 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001349 """\
1350 Called when autoserv has exited without writing an exit status,
1351 or we've timed out waiting for autoserv to write a pid to the
1352 pidfile. In either case, we just return failure and the caller
1353 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001354
showard170873e2009-01-07 00:22:26 +00001355 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001356 """
1357 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001358 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001359 self._state.exit_status = 1
1360 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001361
1362
jadmanski0afbb632008-06-06 21:10:57 +00001363 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001364 self._get_pidfile_info()
1365 return self._state.exit_status
1366
1367
1368 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001369 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001370 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001371 if self._state.num_tests_failed is None:
1372 return -1
showard21baa452008-10-21 00:08:39 +00001373 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001374
1375
showardcdaeae82009-08-31 18:32:48 +00001376 def try_copy_results_on_drone(self, **kwargs):
1377 if self.has_process():
1378 # copy results logs into the normal place for job results
1379 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1380
1381
1382 def try_copy_to_results_repository(self, source, **kwargs):
1383 if self.has_process():
1384 _drone_manager.copy_to_results_repository(self.get_process(),
1385 source, **kwargs)
1386
1387
mbligh36768f02008-02-22 18:28:33 +00001388class Agent(object):
showard77182562009-06-10 00:16:05 +00001389 """
showard8cc058f2009-09-08 16:26:33 +00001390 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001391
1392 The following methods are required on all task objects:
1393 poll() - Called periodically to let the task check its status and
1394 update its internal state. If the task succeeded.
1395 is_done() - Returns True if the task is finished.
1396 abort() - Called when an abort has been requested. The task must
1397 set its aborted attribute to True if it actually aborted.
1398
1399 The following attributes are required on all task objects:
1400 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001401 success - bool, True if this task succeeded.
1402 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1403 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001404 """
1405
1406
showard418785b2009-11-23 20:19:59 +00001407 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001408 """
showard8cc058f2009-09-08 16:26:33 +00001409 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001410 """
showard8cc058f2009-09-08 16:26:33 +00001411 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001412
showard77182562009-06-10 00:16:05 +00001413 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001414 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001415
showard8cc058f2009-09-08 16:26:33 +00001416 self.queue_entry_ids = task.queue_entry_ids
1417 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001418
showard8cc058f2009-09-08 16:26:33 +00001419 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001420 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001421
1422
jadmanski0afbb632008-06-06 21:10:57 +00001423 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001424 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001425 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001426 self.task.poll()
1427 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001428 self.finished = True
showardec113162008-05-08 00:52:49 +00001429
1430
jadmanski0afbb632008-06-06 21:10:57 +00001431 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001432 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001433
1434
showardd3dc1992009-04-22 21:01:40 +00001435 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001436 if self.task:
1437 self.task.abort()
1438 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001439 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001440 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001441
showardd3dc1992009-04-22 21:01:40 +00001442
mbligh36768f02008-02-22 18:28:33 +00001443class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001444 class _NullMonitor(object):
1445 pidfile_id = None
1446
1447 def has_process(self):
1448 return True
1449
1450
1451 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001452 """
showardd1195652009-12-08 22:21:02 +00001453 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001454 """
jadmanski0afbb632008-06-06 21:10:57 +00001455 self.done = False
showardd1195652009-12-08 22:21:02 +00001456 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001457 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001458 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001459 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001460 self.queue_entry_ids = []
1461 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001462 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001463
1464
1465 def _set_ids(self, host=None, queue_entries=None):
1466 if queue_entries and queue_entries != [None]:
1467 self.host_ids = [entry.host.id for entry in queue_entries]
1468 self.queue_entry_ids = [entry.id for entry in queue_entries]
1469 else:
1470 assert host
1471 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001472
1473
jadmanski0afbb632008-06-06 21:10:57 +00001474 def poll(self):
showard08a36412009-05-05 01:01:13 +00001475 if not self.started:
1476 self.start()
showardd1195652009-12-08 22:21:02 +00001477 if not self.done:
1478 self.tick()
showard08a36412009-05-05 01:01:13 +00001479
1480
1481 def tick(self):
showardd1195652009-12-08 22:21:02 +00001482 assert self.monitor
1483 exit_code = self.monitor.exit_code()
1484 if exit_code is None:
1485 return
mbligh36768f02008-02-22 18:28:33 +00001486
showardd1195652009-12-08 22:21:02 +00001487 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001489
1490
jadmanski0afbb632008-06-06 21:10:57 +00001491 def is_done(self):
1492 return self.done
mbligh36768f02008-02-22 18:28:33 +00001493
1494
jadmanski0afbb632008-06-06 21:10:57 +00001495 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001496 if self.done:
showardd1195652009-12-08 22:21:02 +00001497 assert self.started
showard08a36412009-05-05 01:01:13 +00001498 return
showardd1195652009-12-08 22:21:02 +00001499 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001500 self.done = True
1501 self.success = success
1502 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001503
1504
jadmanski0afbb632008-06-06 21:10:57 +00001505 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001506 """
1507 To be overridden.
1508 """
showarded2afea2009-07-07 20:54:07 +00001509 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001510 self.register_necessary_pidfiles()
1511
1512
1513 def _log_file(self):
1514 if not self._log_file_name:
1515 return None
1516 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001517
mbligh36768f02008-02-22 18:28:33 +00001518
jadmanski0afbb632008-06-06 21:10:57 +00001519 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001520 log_file = self._log_file()
1521 if self.monitor and log_file:
1522 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001526 """
1527 To be overridden.
1528 """
jadmanski0afbb632008-06-06 21:10:57 +00001529 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001530 logging.info("%s finished with success=%s", type(self).__name__,
1531 self.success)
1532
mbligh36768f02008-02-22 18:28:33 +00001533
1534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001536 if not self.started:
1537 self.prolog()
1538 self.run()
1539
1540 self.started = True
1541
1542
1543 def abort(self):
1544 if self.monitor:
1545 self.monitor.kill()
1546 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001547 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001548 self.cleanup()
1549
1550
showarded2afea2009-07-07 20:54:07 +00001551 def _get_consistent_execution_path(self, execution_entries):
1552 first_execution_path = execution_entries[0].execution_path()
1553 for execution_entry in execution_entries[1:]:
1554 assert execution_entry.execution_path() == first_execution_path, (
1555 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1556 execution_entry,
1557 first_execution_path,
1558 execution_entries[0]))
1559 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001560
1561
showarded2afea2009-07-07 20:54:07 +00001562 def _copy_results(self, execution_entries, use_monitor=None):
1563 """
1564 @param execution_entries: list of objects with execution_path() method
1565 """
showard6d1c1432009-08-20 23:30:39 +00001566 if use_monitor is not None and not use_monitor.has_process():
1567 return
1568
showarded2afea2009-07-07 20:54:07 +00001569 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001570 if use_monitor is None:
1571 assert self.monitor
1572 use_monitor = self.monitor
1573 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001574 execution_path = self._get_consistent_execution_path(execution_entries)
1575 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001576 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001577
showarda1e74b32009-05-12 17:32:04 +00001578
1579 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001580 for queue_entry in queue_entries:
1581 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001582
1583
mbligh4608b002010-01-05 18:22:35 +00001584 def _archive_results(self, queue_entries):
1585 for queue_entry in queue_entries:
1586 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001587
1588
showardd1195652009-12-08 22:21:02 +00001589 def _command_line(self):
1590 """
1591 Return the command line to run. Must be overridden.
1592 """
1593 raise NotImplementedError
1594
1595
1596 @property
1597 def num_processes(self):
1598 """
1599 Return the number of processes forked by this AgentTask's process. It
1600 may only be approximate. To be overridden if necessary.
1601 """
1602 return 1
1603
1604
1605 def _paired_with_monitor(self):
1606 """
1607 If this AgentTask's process must run on the same machine as some
1608 previous process, this method should be overridden to return a
1609 PidfileRunMonitor for that process.
1610 """
1611 return self._NullMonitor()
1612
1613
1614 @property
1615 def owner_username(self):
1616 """
1617 Return login of user responsible for this task. May be None. Must be
1618 overridden.
1619 """
1620 raise NotImplementedError
1621
1622
1623 def _working_directory(self):
1624 """
1625 Return the directory where this AgentTask's process executes. Must be
1626 overridden.
1627 """
1628 raise NotImplementedError
1629
1630
1631 def _pidfile_name(self):
1632 """
1633 Return the name of the pidfile this AgentTask's process uses. To be
1634 overridden if necessary.
1635 """
jamesrenc44ae992010-02-19 00:12:54 +00001636 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001637
1638
1639 def _check_paired_results_exist(self):
1640 if not self._paired_with_monitor().has_process():
1641 email_manager.manager.enqueue_notify_email(
1642 'No paired results in task',
1643 'No paired results in task %s at %s'
1644 % (self, self._paired_with_monitor().pidfile_id))
1645 self.finished(False)
1646 return False
1647 return True
1648
1649
1650 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001651 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001652 self.monitor = PidfileRunMonitor()
1653
1654
1655 def run(self):
1656 if not self._check_paired_results_exist():
1657 return
1658
1659 self._create_monitor()
1660 self.monitor.run(
1661 self._command_line(), self._working_directory(),
1662 num_processes=self.num_processes,
1663 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1664 pidfile_name=self._pidfile_name(),
1665 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1666 username=self.owner_username)
1667
1668
1669 def register_necessary_pidfiles(self):
1670 pidfile_id = _drone_manager.get_pidfile_id_from(
1671 self._working_directory(), self._pidfile_name())
1672 _drone_manager.register_pidfile(pidfile_id)
1673
1674 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1675 if paired_pidfile_id:
1676 _drone_manager.register_pidfile(paired_pidfile_id)
1677
1678
1679 def recover(self):
1680 if not self._check_paired_results_exist():
1681 return
1682
1683 self._create_monitor()
1684 self.monitor.attach_to_existing_process(
1685 self._working_directory(), pidfile_name=self._pidfile_name(),
1686 num_processes=self.num_processes)
1687 if not self.monitor.has_process():
1688 # no process to recover; wait to be started normally
1689 self.monitor = None
1690 return
1691
1692 self.started = True
1693 logging.info('Recovering process %s for %s at %s'
1694 % (self.monitor.get_process(), type(self).__name__,
1695 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001696
1697
mbligh4608b002010-01-05 18:22:35 +00001698 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1699 allowed_host_statuses=None):
1700 for entry in queue_entries:
1701 if entry.status not in allowed_hqe_statuses:
1702 raise SchedulerError('Queue task attempting to start '
1703 'entry with invalid status %s: %s'
1704 % (entry.status, entry))
1705 invalid_host_status = (
1706 allowed_host_statuses is not None
1707 and entry.host.status not in allowed_host_statuses)
1708 if invalid_host_status:
1709 raise SchedulerError('Queue task attempting to start on queue '
1710 'entry with invalid host status %s: %s'
1711 % (entry.host.status, entry))
1712
1713
showardd9205182009-04-27 20:09:55 +00001714class TaskWithJobKeyvals(object):
1715 """AgentTask mixin providing functionality to help with job keyval files."""
1716 _KEYVAL_FILE = 'keyval'
1717 def _format_keyval(self, key, value):
1718 return '%s=%s' % (key, value)
1719
1720
1721 def _keyval_path(self):
1722 """Subclasses must override this"""
1723 raise NotImplemented
1724
1725
1726 def _write_keyval_after_job(self, field, value):
1727 assert self.monitor
1728 if not self.monitor.has_process():
1729 return
1730 _drone_manager.write_lines_to_file(
1731 self._keyval_path(), [self._format_keyval(field, value)],
1732 paired_with_process=self.monitor.get_process())
1733
1734
1735 def _job_queued_keyval(self, job):
1736 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1737
1738
1739 def _write_job_finished(self):
1740 self._write_keyval_after_job("job_finished", int(time.time()))
1741
1742
showarddb502762009-09-09 15:31:20 +00001743 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1744 keyval_contents = '\n'.join(self._format_keyval(key, value)
1745 for key, value in keyval_dict.iteritems())
1746 # always end with a newline to allow additional keyvals to be written
1747 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001748 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001749 keyval_contents,
1750 file_path=keyval_path)
1751
1752
1753 def _write_keyvals_before_job(self, keyval_dict):
1754 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1755
1756
1757 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001758 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001759 host.hostname)
1760 platform, all_labels = host.platform_and_labels()
1761 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1762 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1763
1764
showard8cc058f2009-09-08 16:26:33 +00001765class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001766 """
1767 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1768 """
1769
1770 TASK_TYPE = None
1771 host = None
1772 queue_entry = None
1773
showardd1195652009-12-08 22:21:02 +00001774 def __init__(self, task, extra_command_args):
1775 super(SpecialAgentTask, self).__init__()
1776
showarded2afea2009-07-07 20:54:07 +00001777 assert (self.TASK_TYPE is not None,
1778 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001779
jamesrenc44ae992010-02-19 00:12:54 +00001780 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001781 self.queue_entry = None
1782 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001783 self.queue_entry = scheduler_models.HostQueueEntry(
1784 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001785
showarded2afea2009-07-07 20:54:07 +00001786 self.task = task
1787 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001788
1789
showard8cc058f2009-09-08 16:26:33 +00001790 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001791 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1792
1793
1794 def _command_line(self):
1795 return _autoserv_command_line(self.host.hostname,
1796 self._extra_command_args,
1797 queue_entry=self.queue_entry)
1798
1799
1800 def _working_directory(self):
1801 return self.task.execution_path()
1802
1803
1804 @property
1805 def owner_username(self):
1806 if self.task.requested_by:
1807 return self.task.requested_by.login
1808 return None
showard8cc058f2009-09-08 16:26:33 +00001809
1810
showarded2afea2009-07-07 20:54:07 +00001811 def prolog(self):
1812 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001813 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001814 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001815
1816
showardde634ee2009-01-30 01:44:24 +00001817 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001818 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001819
showard2fe3f1d2009-07-06 20:19:11 +00001820 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001821 return # don't fail metahost entries, they'll be reassigned
1822
showard2fe3f1d2009-07-06 20:19:11 +00001823 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001824 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001825 return # entry has been aborted
1826
showard2fe3f1d2009-07-06 20:19:11 +00001827 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001828 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001829 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001830 self._write_keyval_after_job(queued_key, queued_time)
1831 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001832
showard8cc058f2009-09-08 16:26:33 +00001833 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001834 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001835 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001836 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001837
showard8cc058f2009-09-08 16:26:33 +00001838 pidfile_id = _drone_manager.get_pidfile_id_from(
1839 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001840 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001841 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001842
1843 if self.queue_entry.job.parse_failed_repair:
1844 self._parse_results([self.queue_entry])
1845 else:
1846 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001847
1848
1849 def cleanup(self):
1850 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001851
1852 # We will consider an aborted task to be "Failed"
1853 self.task.finish(bool(self.success))
1854
showardf85a0b72009-10-07 20:48:45 +00001855 if self.monitor:
1856 if self.monitor.has_process():
1857 self._copy_results([self.task])
1858 if self.monitor.pidfile_id is not None:
1859 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001860
1861
1862class RepairTask(SpecialAgentTask):
1863 TASK_TYPE = models.SpecialTask.Task.REPAIR
1864
1865
showardd1195652009-12-08 22:21:02 +00001866 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001867 """\
1868 queue_entry: queue entry to mark failed if this repair fails.
1869 """
1870 protection = host_protections.Protection.get_string(
1871 task.host.protection)
1872 # normalize the protection name
1873 protection = host_protections.Protection.get_attr_name(protection)
1874
1875 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001876 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001877
1878 # *don't* include the queue entry in IDs -- if the queue entry is
1879 # aborted, we want to leave the repair task running
1880 self._set_ids(host=self.host)
1881
1882
1883 def prolog(self):
1884 super(RepairTask, self).prolog()
1885 logging.info("repair_task starting")
1886 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001887
1888
jadmanski0afbb632008-06-06 21:10:57 +00001889 def epilog(self):
1890 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001891
jadmanski0afbb632008-06-06 21:10:57 +00001892 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001893 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001894 else:
showard8cc058f2009-09-08 16:26:33 +00001895 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001896 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001897 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001898
1899
showarded2afea2009-07-07 20:54:07 +00001900class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001901 def _copy_to_results_repository(self):
1902 if not self.queue_entry or self.queue_entry.meta_host:
1903 return
1904
1905 self.queue_entry.set_execution_subdir()
1906 log_name = os.path.basename(self.task.execution_path())
1907 source = os.path.join(self.task.execution_path(), 'debug',
1908 'autoserv.DEBUG')
1909 destination = os.path.join(
1910 self.queue_entry.execution_path(), log_name)
1911
1912 self.monitor.try_copy_to_results_repository(
1913 source, destination_path=destination)
1914
1915
showard170873e2009-01-07 00:22:26 +00001916 def epilog(self):
1917 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001918
showard775300b2009-09-09 15:30:50 +00001919 if self.success:
1920 return
showard8fe93b52008-11-18 17:53:22 +00001921
showard775300b2009-09-09 15:30:50 +00001922 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001923
showard775300b2009-09-09 15:30:50 +00001924 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001925 # effectively ignore failure for these hosts
1926 self.success = True
showard775300b2009-09-09 15:30:50 +00001927 return
1928
1929 if self.queue_entry:
1930 self.queue_entry.requeue()
1931
1932 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001933 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001934 queue_entry__id=self.queue_entry.id):
1935 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1936 self._fail_queue_entry()
1937 return
1938
showard9bb960b2009-11-19 01:02:11 +00001939 queue_entry = models.HostQueueEntry.objects.get(
1940 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001941 else:
1942 queue_entry = None
1943
1944 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001945 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001946 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001947 queue_entry=queue_entry,
1948 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001949
showard8fe93b52008-11-18 17:53:22 +00001950
1951class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001952 TASK_TYPE = models.SpecialTask.Task.VERIFY
1953
1954
showardd1195652009-12-08 22:21:02 +00001955 def __init__(self, task):
1956 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001957 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001958
1959
jadmanski0afbb632008-06-06 21:10:57 +00001960 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001961 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001962
showardb18134f2009-03-20 20:52:18 +00001963 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001964 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001965 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1966 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001967
showarded2afea2009-07-07 20:54:07 +00001968 # Delete any other queued verifies for this host. One verify will do
1969 # and there's no need to keep records of other requests.
1970 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001971 host__id=self.host.id,
1972 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001973 is_active=False, is_complete=False)
1974 queued_verifies = queued_verifies.exclude(id=self.task.id)
1975 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001976
mbligh36768f02008-02-22 18:28:33 +00001977
jadmanski0afbb632008-06-06 21:10:57 +00001978 def epilog(self):
1979 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001980 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001981 if self.queue_entry:
1982 self.queue_entry.on_pending()
1983 else:
1984 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001985
1986
mbligh4608b002010-01-05 18:22:35 +00001987class CleanupTask(PreJobTask):
1988 # note this can also run post-job, but when it does, it's running standalone
1989 # against the host (not related to the job), so it's not considered a
1990 # PostJobTask
1991
1992 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1993
1994
1995 def __init__(self, task, recover_run_monitor=None):
1996 super(CleanupTask, self).__init__(task, ['--cleanup'])
1997 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1998
1999
2000 def prolog(self):
2001 super(CleanupTask, self).prolog()
2002 logging.info("starting cleanup task for host: %s", self.host.hostname)
2003 self.host.set_status(models.Host.Status.CLEANING)
2004 if self.queue_entry:
2005 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2006
2007
2008 def _finish_epilog(self):
2009 if not self.queue_entry or not self.success:
2010 return
2011
2012 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2013 should_run_verify = (
2014 self.queue_entry.job.run_verify
2015 and self.host.protection != do_not_verify_protection)
2016 if should_run_verify:
2017 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2018 models.SpecialTask.objects.create(
2019 host=models.Host.objects.get(id=self.host.id),
2020 queue_entry=entry,
2021 task=models.SpecialTask.Task.VERIFY)
2022 else:
2023 self.queue_entry.on_pending()
2024
2025
2026 def epilog(self):
2027 super(CleanupTask, self).epilog()
2028
2029 if self.success:
2030 self.host.update_field('dirty', 0)
2031 self.host.set_status(models.Host.Status.READY)
2032
2033 self._finish_epilog()
2034
2035
showarda9545c02009-12-18 22:44:26 +00002036class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2037 """
2038 Common functionality for QueueTask and HostlessQueueTask
2039 """
2040 def __init__(self, queue_entries):
2041 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002042 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002043 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002044
2045
showard73ec0442009-02-07 02:05:20 +00002046 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002047 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002048
2049
jamesrenc44ae992010-02-19 00:12:54 +00002050 def _write_control_file(self, execution_path):
2051 control_path = _drone_manager.attach_file_to_execution(
2052 execution_path, self.job.control_file)
2053 return control_path
2054
2055
showardd1195652009-12-08 22:21:02 +00002056 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002057 execution_path = self.queue_entries[0].execution_path()
2058 control_path = self._write_control_file(execution_path)
2059 hostnames = ','.join(entry.host.hostname
2060 for entry in self.queue_entries
2061 if not entry.is_hostless())
2062
2063 execution_tag = self.queue_entries[0].execution_tag()
2064 params = _autoserv_command_line(
2065 hostnames,
2066 ['-P', execution_tag, '-n',
2067 _drone_manager.absolute_path(control_path)],
2068 job=self.job, verbose=False)
2069
2070 if not self.job.is_server_job():
2071 params.append('-c')
2072
2073 return params
showardd1195652009-12-08 22:21:02 +00002074
2075
2076 @property
2077 def num_processes(self):
2078 return len(self.queue_entries)
2079
2080
2081 @property
2082 def owner_username(self):
2083 return self.job.owner
2084
2085
2086 def _working_directory(self):
2087 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002088
2089
jadmanski0afbb632008-06-06 21:10:57 +00002090 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002091 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002092 keyval_dict = self.job.keyval_dict()
2093 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002094 group_name = self.queue_entries[0].get_group_name()
2095 if group_name:
2096 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002097 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002098 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002099 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002100 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002101
2102
showard35162b02009-03-03 02:17:30 +00002103 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002104 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002105 _drone_manager.write_lines_to_file(error_file_path,
2106 [_LOST_PROCESS_ERROR])
2107
2108
showardd3dc1992009-04-22 21:01:40 +00002109 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002110 if not self.monitor:
2111 return
2112
showardd9205182009-04-27 20:09:55 +00002113 self._write_job_finished()
2114
showard35162b02009-03-03 02:17:30 +00002115 if self.monitor.lost_process:
2116 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002117
jadmanskif7fa2cc2008-10-01 14:13:23 +00002118
showardcbd74612008-11-19 21:42:02 +00002119 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002120 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002121 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002122 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002123 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002124
2125
jadmanskif7fa2cc2008-10-01 14:13:23 +00002126 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002127 if not self.monitor or not self.monitor.has_process():
2128 return
2129
jadmanskif7fa2cc2008-10-01 14:13:23 +00002130 # build up sets of all the aborted_by and aborted_on values
2131 aborted_by, aborted_on = set(), set()
2132 for queue_entry in self.queue_entries:
2133 if queue_entry.aborted_by:
2134 aborted_by.add(queue_entry.aborted_by)
2135 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2136 aborted_on.add(t)
2137
2138 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002139 # TODO(showard): this conditional is now obsolete, we just need to leave
2140 # it in temporarily for backwards compatibility over upgrades. delete
2141 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002142 assert len(aborted_by) <= 1
2143 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002144 aborted_by_value = aborted_by.pop()
2145 aborted_on_value = max(aborted_on)
2146 else:
2147 aborted_by_value = 'autotest_system'
2148 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002149
showarda0382352009-02-11 23:36:43 +00002150 self._write_keyval_after_job("aborted_by", aborted_by_value)
2151 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002152
showardcbd74612008-11-19 21:42:02 +00002153 aborted_on_string = str(datetime.datetime.fromtimestamp(
2154 aborted_on_value))
2155 self._write_status_comment('Job aborted by %s on %s' %
2156 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002157
2158
jadmanski0afbb632008-06-06 21:10:57 +00002159 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002160 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002161 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002162 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002163
2164
jadmanski0afbb632008-06-06 21:10:57 +00002165 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002166 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002167 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002168
2169
2170class QueueTask(AbstractQueueTask):
2171 def __init__(self, queue_entries):
2172 super(QueueTask, self).__init__(queue_entries)
2173 self._set_ids(queue_entries=queue_entries)
2174
2175
2176 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002177 self._check_queue_entry_statuses(
2178 self.queue_entries,
2179 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2180 models.HostQueueEntry.Status.RUNNING),
2181 allowed_host_statuses=(models.Host.Status.PENDING,
2182 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002183
2184 super(QueueTask, self).prolog()
2185
2186 for queue_entry in self.queue_entries:
2187 self._write_host_keyvals(queue_entry.host)
2188 queue_entry.host.set_status(models.Host.Status.RUNNING)
2189 queue_entry.host.update_field('dirty', 1)
2190 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2191 # TODO(gps): Remove this if nothing needs it anymore.
2192 # A potential user is: tko/parser
2193 self.job.write_to_machines_file(self.queue_entries[0])
2194
2195
2196 def _finish_task(self):
2197 super(QueueTask, self)._finish_task()
2198
2199 for queue_entry in self.queue_entries:
2200 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002201
2202
mbligh4608b002010-01-05 18:22:35 +00002203class HostlessQueueTask(AbstractQueueTask):
2204 def __init__(self, queue_entry):
2205 super(HostlessQueueTask, self).__init__([queue_entry])
2206 self.queue_entry_ids = [queue_entry.id]
2207
2208
2209 def prolog(self):
2210 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2211 super(HostlessQueueTask, self).prolog()
2212
2213
mbligh4608b002010-01-05 18:22:35 +00002214 def _finish_task(self):
2215 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002216 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002217
2218
showardd3dc1992009-04-22 21:01:40 +00002219class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002220 def __init__(self, queue_entries, log_file_name):
2221 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002222
showardd1195652009-12-08 22:21:02 +00002223 self.queue_entries = queue_entries
2224
showardd3dc1992009-04-22 21:01:40 +00002225 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002226 self._autoserv_monitor.attach_to_existing_process(
2227 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002228
showardd1195652009-12-08 22:21:02 +00002229
2230 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002231 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002232 return 'true'
2233 return self._generate_command(
2234 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002235
2236
2237 def _generate_command(self, results_dir):
2238 raise NotImplementedError('Subclasses must override this')
2239
2240
showardd1195652009-12-08 22:21:02 +00002241 @property
2242 def owner_username(self):
2243 return self.queue_entries[0].job.owner
2244
2245
2246 def _working_directory(self):
2247 return self._get_consistent_execution_path(self.queue_entries)
2248
2249
2250 def _paired_with_monitor(self):
2251 return self._autoserv_monitor
2252
2253
showardd3dc1992009-04-22 21:01:40 +00002254 def _job_was_aborted(self):
2255 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002256 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002257 queue_entry.update_from_database()
2258 if was_aborted is None: # first queue entry
2259 was_aborted = bool(queue_entry.aborted)
2260 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2261 email_manager.manager.enqueue_notify_email(
2262 'Inconsistent abort state',
2263 'Queue entries have inconsistent abort state: ' +
2264 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2265 # don't crash here, just assume true
2266 return True
2267 return was_aborted
2268
2269
showardd1195652009-12-08 22:21:02 +00002270 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002271 if self._job_was_aborted():
2272 return models.HostQueueEntry.Status.ABORTED
2273
2274 # we'll use a PidfileRunMonitor to read the autoserv exit status
2275 if self._autoserv_monitor.exit_code() == 0:
2276 return models.HostQueueEntry.Status.COMPLETED
2277 return models.HostQueueEntry.Status.FAILED
2278
2279
showardd3dc1992009-04-22 21:01:40 +00002280 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002281 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002282 queue_entry.set_status(status)
2283
2284
2285 def abort(self):
2286 # override AgentTask.abort() to avoid killing the process and ending
2287 # the task. post-job tasks continue when the job is aborted.
2288 pass
2289
2290
mbligh4608b002010-01-05 18:22:35 +00002291 def _pidfile_label(self):
2292 # '.autoserv_execute' -> 'autoserv'
2293 return self._pidfile_name()[1:-len('_execute')]
2294
2295
showard9bb960b2009-11-19 01:02:11 +00002296class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002297 """
2298 Task responsible for
2299 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2300 * copying logs to the results repository
2301 * spawning CleanupTasks for hosts, if necessary
2302 * spawning a FinalReparseTask for the job
2303 """
showardd1195652009-12-08 22:21:02 +00002304 def __init__(self, queue_entries, recover_run_monitor=None):
2305 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002306 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002307 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002308 self._set_ids(queue_entries=queue_entries)
2309
2310
2311 def _generate_command(self, results_dir):
2312 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002313 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002314 return [_autoserv_path , '-p',
2315 '--pidfile-label=%s' % self._pidfile_label(),
2316 '--use-existing-results', '--collect-crashinfo',
2317 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002318
2319
showardd1195652009-12-08 22:21:02 +00002320 @property
2321 def num_processes(self):
2322 return len(self.queue_entries)
2323
2324
2325 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002326 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002327
2328
showardd3dc1992009-04-22 21:01:40 +00002329 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002330 self._check_queue_entry_statuses(
2331 self.queue_entries,
2332 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2333 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002334
showardd3dc1992009-04-22 21:01:40 +00002335 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002336
2337
showardd3dc1992009-04-22 21:01:40 +00002338 def epilog(self):
2339 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002340 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002341 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002342
showard9bb960b2009-11-19 01:02:11 +00002343
2344 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002345 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002346 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002347 models.HostQueueEntry.Status.COMPLETED)
2348 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2349 else:
2350 final_success = False
2351 num_tests_failed = 0
2352
showard9bb960b2009-11-19 01:02:11 +00002353 reboot_after = self._job.reboot_after
2354 do_reboot = (
2355 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002356 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002357 or reboot_after == models.RebootAfter.ALWAYS
2358 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2359 and final_success and num_tests_failed == 0))
2360
showardd1195652009-12-08 22:21:02 +00002361 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002362 if do_reboot:
2363 # don't pass the queue entry to the CleanupTask. if the cleanup
2364 # fails, the job doesn't care -- it's over.
2365 models.SpecialTask.objects.create(
2366 host=models.Host.objects.get(id=queue_entry.host.id),
2367 task=models.SpecialTask.Task.CLEANUP,
2368 requested_by=self._job.owner_model())
2369 else:
2370 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002371
2372
showard0bbfc212009-04-29 21:06:13 +00002373 def run(self):
showard597bfd32009-05-08 18:22:50 +00002374 autoserv_exit_code = self._autoserv_monitor.exit_code()
2375 # only run if Autoserv exited due to some signal. if we have no exit
2376 # code, assume something bad (and signal-like) happened.
2377 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002378 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002379 else:
2380 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002381
2382
mbligh4608b002010-01-05 18:22:35 +00002383class SelfThrottledPostJobTask(PostJobTask):
2384 """
2385 Special AgentTask subclass that maintains its own global process limit.
2386 """
2387 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002388
2389
mbligh4608b002010-01-05 18:22:35 +00002390 @classmethod
2391 def _increment_running_processes(cls):
2392 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002393
mblighd5c95802008-03-05 00:33:46 +00002394
mbligh4608b002010-01-05 18:22:35 +00002395 @classmethod
2396 def _decrement_running_processes(cls):
2397 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002398
2399
mbligh4608b002010-01-05 18:22:35 +00002400 @classmethod
2401 def _max_processes(cls):
2402 raise NotImplementedError
2403
2404
2405 @classmethod
2406 def _can_run_new_process(cls):
2407 return cls._num_running_processes < cls._max_processes()
2408
2409
2410 def _process_started(self):
2411 return bool(self.monitor)
2412
2413
2414 def tick(self):
2415 # override tick to keep trying to start until the process count goes
2416 # down and we can, at which point we revert to default behavior
2417 if self._process_started():
2418 super(SelfThrottledPostJobTask, self).tick()
2419 else:
2420 self._try_starting_process()
2421
2422
2423 def run(self):
2424 # override run() to not actually run unless we can
2425 self._try_starting_process()
2426
2427
2428 def _try_starting_process(self):
2429 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002430 return
2431
mbligh4608b002010-01-05 18:22:35 +00002432 # actually run the command
2433 super(SelfThrottledPostJobTask, self).run()
2434 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002435
mblighd5c95802008-03-05 00:33:46 +00002436
mbligh4608b002010-01-05 18:22:35 +00002437 def finished(self, success):
2438 super(SelfThrottledPostJobTask, self).finished(success)
2439 if self._process_started():
2440 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002441
showard21baa452008-10-21 00:08:39 +00002442
mbligh4608b002010-01-05 18:22:35 +00002443class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002444 def __init__(self, queue_entries):
2445 super(FinalReparseTask, self).__init__(queue_entries,
2446 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002447 # don't use _set_ids, since we don't want to set the host_ids
2448 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002449
2450
2451 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002452 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002453 results_dir]
2454
2455
2456 @property
2457 def num_processes(self):
2458 return 0 # don't include parser processes in accounting
2459
2460
2461 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002462 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002463
2464
showard97aed502008-11-04 02:01:24 +00002465 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002466 def _max_processes(cls):
2467 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002468
2469
2470 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002471 self._check_queue_entry_statuses(
2472 self.queue_entries,
2473 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002474
showard97aed502008-11-04 02:01:24 +00002475 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002476
2477
2478 def epilog(self):
2479 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002480 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002481
2482
mbligh4608b002010-01-05 18:22:35 +00002483class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002484 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2485
mbligh4608b002010-01-05 18:22:35 +00002486 def __init__(self, queue_entries):
2487 super(ArchiveResultsTask, self).__init__(queue_entries,
2488 log_file_name='.archiving.log')
2489 # don't use _set_ids, since we don't want to set the host_ids
2490 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002491
2492
mbligh4608b002010-01-05 18:22:35 +00002493 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002494 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002495
2496
mbligh4608b002010-01-05 18:22:35 +00002497 def _generate_command(self, results_dir):
2498 return [_autoserv_path , '-p',
2499 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
2500 '--use-existing-results',
showard948eb302010-01-15 00:16:20 +00002501 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2502 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002503
2504
mbligh4608b002010-01-05 18:22:35 +00002505 @classmethod
2506 def _max_processes(cls):
2507 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002508
2509
2510 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002511 self._check_queue_entry_statuses(
2512 self.queue_entries,
2513 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2514
2515 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002516
2517
mbligh4608b002010-01-05 18:22:35 +00002518 def epilog(self):
2519 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002520 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002521 failed_file = os.path.join(self._working_directory(),
2522 self._ARCHIVING_FAILED_FILE)
2523 paired_process = self._paired_with_monitor().get_process()
2524 _drone_manager.write_lines_to_file(
2525 failed_file, ['Archiving failed with exit code %s'
2526 % self.monitor.exit_code()],
2527 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002528 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002529
2530
mbligh36768f02008-02-22 18:28:33 +00002531if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002532 main()