blob: 8aae8a1a28390bb2f166306d35cd695e79752678 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000024from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000025from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000026from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000027from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000028from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
55_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000056_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000057_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000058
59
showardec6a3b92009-09-25 20:29:13 +000060def _get_pidfile_timeout_secs():
61 """@returns How long to wait for autoserv to write pidfile."""
62 pidfile_timeout_mins = global_config.global_config.get_config_value(
63 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
64 return pidfile_timeout_mins * 60
65
66
mbligh83c1e9e2009-05-01 23:10:41 +000067def _site_init_monitor_db_dummy():
68 return {}
69
70
jamesrenc44ae992010-02-19 00:12:54 +000071get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000072 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000073 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000074
75
mbligh36768f02008-02-22 18:28:33 +000076def main():
showard27f33872009-04-07 18:20:53 +000077 try:
showard549afad2009-08-20 23:33:36 +000078 try:
79 main_without_exception_handling()
80 except SystemExit:
81 raise
82 except:
83 logging.exception('Exception escaping in monitor_db')
84 raise
85 finally:
86 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000087
88
89def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000090 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000091
showard136e6dc2009-06-10 19:38:49 +000092 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000093 parser = optparse.OptionParser(usage)
94 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
95 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000096 parser.add_option('--test', help='Indicate that scheduler is under ' +
97 'test and should use dummy autoserv and no parsing',
98 action='store_true')
99 (options, args) = parser.parse_args()
100 if len(args) != 1:
101 parser.print_usage()
102 return
mbligh36768f02008-02-22 18:28:33 +0000103
showard5613c662009-06-08 23:30:33 +0000104 scheduler_enabled = global_config.global_config.get_config_value(
105 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
106
107 if not scheduler_enabled:
108 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
109 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000110 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000111 sys.exit(1)
112
jadmanski0afbb632008-06-06 21:10:57 +0000113 global RESULTS_DIR
114 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000115
mbligh83c1e9e2009-05-01 23:10:41 +0000116 site_init = utils.import_site_function(__file__,
117 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
118 _site_init_monitor_db_dummy)
119 site_init()
120
showardcca334f2009-03-12 20:38:34 +0000121 # Change the cwd while running to avoid issues incase we were launched from
122 # somewhere odd (such as a random NFS home directory of the person running
123 # sudo to launch us as the appropriate user).
124 os.chdir(RESULTS_DIR)
125
showardc85c21b2008-11-24 22:17:37 +0000126
jadmanski0afbb632008-06-06 21:10:57 +0000127 if options.test:
128 global _autoserv_path
129 _autoserv_path = 'autoserv_dummy'
130 global _testing_mode
131 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000132
jamesrenc44ae992010-02-19 00:12:54 +0000133 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000134 server.start()
135
jadmanski0afbb632008-06-06 21:10:57 +0000136 try:
jamesrenc44ae992010-02-19 00:12:54 +0000137 initialize()
showardc5afc462009-01-13 00:09:39 +0000138 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000139 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000140
jadmanski0afbb632008-06-06 21:10:57 +0000141 while not _shutdown:
142 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000143 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000144 except:
showard170873e2009-01-07 00:22:26 +0000145 email_manager.manager.log_stacktrace(
146 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000147
showard170873e2009-01-07 00:22:26 +0000148 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000149 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000150 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000151 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000152
153
showard136e6dc2009-06-10 19:38:49 +0000154def setup_logging():
155 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
156 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
157 logging_manager.configure_logging(
158 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
159 logfile_name=log_name)
160
161
mbligh36768f02008-02-22 18:28:33 +0000162def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000163 global _shutdown
164 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000165 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000166
167
jamesrenc44ae992010-02-19 00:12:54 +0000168def initialize():
showardb18134f2009-03-20 20:52:18 +0000169 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
170 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000171
showard8de37132009-08-31 18:33:08 +0000172 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000173 logging.critical("monitor_db already running, aborting!")
174 sys.exit(1)
175 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000184 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000188 # bypass the readonly connection
189 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000190
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000192 signal.signal(signal.SIGINT, handle_sigint)
193
jamesrenc44ae992010-02-19 00:12:54 +0000194 initialize_globals()
195 scheduler_models.initialize()
196
showardd1ee1dd2009-01-07 21:33:08 +0000197 drones = global_config.global_config.get_config_value(
198 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
199 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000200 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000201 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000202 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
203
showardb18134f2009-03-20 20:52:18 +0000204 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000205
206
jamesrenc44ae992010-02-19 00:12:54 +0000207def initialize_globals():
208 global _drone_manager
209 _drone_manager = drone_manager.instance()
210
211
showarded2afea2009-07-07 20:54:07 +0000212def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
213 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000214 """
215 @returns The autoserv command line as a list of executable + parameters.
216
217 @param machines - string - A machine or comma separated list of machines
218 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000219 @param extra_args - list - Additional arguments to pass to autoserv.
220 @param job - Job object - If supplied, -u owner and -l name parameters
221 will be added.
222 @param queue_entry - A HostQueueEntry object - If supplied and no Job
223 object was supplied, this will be used to lookup the Job object.
224 """
showarda9545c02009-12-18 22:44:26 +0000225 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000226 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000227 if machines:
228 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
jamesren883492a2010-02-12 00:45:18 +0000242class HostScheduler(metahost_scheduler.HostSchedulingUtility):
243 """Handles the logic for choosing when to run jobs and on which hosts.
244
245 This class makes several queries to the database on each tick, building up
246 some auxiliary data structures and using them to determine which hosts are
247 eligible to run which jobs, taking into account all the various factors that
248 affect that.
249
250 In the past this was done with one or two very large, complex database
251 queries. It has proven much simpler and faster to build these auxiliary
252 data structures and perform the logic in Python.
253 """
254 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000255 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
256
257 # load site-specific scheduler selected in global_config
258 site_schedulers_str = global_config.global_config.get_config_value(
259 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
260 default='')
261 site_schedulers = set(site_schedulers_str.split(','))
262 for scheduler in get_site_metahost_schedulers():
263 if type(scheduler).__name__ in site_schedulers:
264 # always prepend, so site schedulers take precedence
265 self._metahost_schedulers = (
266 [scheduler] + self._metahost_schedulers)
267 logging.info('Metahost schedulers: %s',
268 ', '.join(type(scheduler).__name__ for scheduler
269 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000270
271
showard63a34772008-08-18 19:32:50 +0000272 def _get_ready_hosts(self):
273 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000274 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000275 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
276 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000277 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000278 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000279 "AND NOT afe_hosts.locked "
280 "AND (afe_hosts.status IS NULL "
281 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000282 return dict((host.id, host) for host in hosts)
283
284
285 @staticmethod
286 def _get_sql_id_list(id_list):
287 return ','.join(str(item_id) for item_id in id_list)
288
289
290 @classmethod
showard989f25d2008-10-01 11:38:11 +0000291 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000292 if not id_list:
293 return {}
showard63a34772008-08-18 19:32:50 +0000294 query %= cls._get_sql_id_list(id_list)
295 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000296 return cls._process_many2many_dict(rows, flip)
297
298
299 @staticmethod
300 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000301 result = {}
302 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000303 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000304 if flip:
305 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000306 result.setdefault(left_id, set()).add(right_id)
307 return result
308
309
310 @classmethod
311 def _get_job_acl_groups(cls, job_ids):
312 query = """
showardeab66ce2009-12-23 00:03:56 +0000313 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
314 FROM afe_jobs
315 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
316 INNER JOIN afe_acl_groups_users ON
317 afe_acl_groups_users.user_id = afe_users.id
318 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000319 """
320 return cls._get_many2many_dict(query, job_ids)
321
322
323 @classmethod
324 def _get_job_ineligible_hosts(cls, job_ids):
325 query = """
326 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000327 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000328 WHERE job_id IN (%s)
329 """
330 return cls._get_many2many_dict(query, job_ids)
331
332
333 @classmethod
showard989f25d2008-10-01 11:38:11 +0000334 def _get_job_dependencies(cls, job_ids):
335 query = """
336 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000337 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000338 WHERE job_id IN (%s)
339 """
340 return cls._get_many2many_dict(query, job_ids)
341
342
343 @classmethod
showard63a34772008-08-18 19:32:50 +0000344 def _get_host_acls(cls, host_ids):
345 query = """
showardd9ac4452009-02-07 02:04:37 +0000346 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000347 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000348 WHERE host_id IN (%s)
349 """
350 return cls._get_many2many_dict(query, host_ids)
351
352
353 @classmethod
354 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000355 if not host_ids:
356 return {}, {}
showard63a34772008-08-18 19:32:50 +0000357 query = """
358 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000359 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000360 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000361 """ % cls._get_sql_id_list(host_ids)
362 rows = _db.execute(query)
363 labels_to_hosts = cls._process_many2many_dict(rows)
364 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
365 return labels_to_hosts, hosts_to_labels
366
367
368 @classmethod
369 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000370 return dict((label.id, label) for label
371 in scheduler_models.Label.fetch())
372
373
374 def recovery_on_startup(self):
375 for metahost_scheduler in self._metahost_schedulers:
376 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000377
378
379 def refresh(self, pending_queue_entries):
380 self._hosts_available = self._get_ready_hosts()
381
382 relevant_jobs = [queue_entry.job_id
383 for queue_entry in pending_queue_entries]
384 self._job_acls = self._get_job_acl_groups(relevant_jobs)
385 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000386 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000387
388 host_ids = self._hosts_available.keys()
389 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000390 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
391
392 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000393
jamesrenc44ae992010-02-19 00:12:54 +0000394 for metahost_scheduler in self._metahost_schedulers:
395 metahost_scheduler.tick()
396
showard63a34772008-08-18 19:32:50 +0000397
jamesren883492a2010-02-12 00:45:18 +0000398 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000399 return set(self._label_hosts.get(label_id, ()))
400
401
402 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000403 self._label_hosts[label_id].remove(host_id)
404
405
406 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000407 return self._hosts_available.pop(host_id)
408
409
410 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000411 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
412
413
showard63a34772008-08-18 19:32:50 +0000414 def _is_acl_accessible(self, host_id, queue_entry):
415 job_acls = self._job_acls.get(queue_entry.job_id, set())
416 host_acls = self._host_acls.get(host_id, set())
417 return len(host_acls.intersection(job_acls)) > 0
418
419
showard989f25d2008-10-01 11:38:11 +0000420 def _check_job_dependencies(self, job_dependencies, host_labels):
421 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000422 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000423
424
425 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
426 queue_entry):
showardade14e22009-01-26 22:38:32 +0000427 if not queue_entry.meta_host:
428 # bypass only_if_needed labels when a specific host is selected
429 return True
430
showard989f25d2008-10-01 11:38:11 +0000431 for label_id in host_labels:
432 label = self._labels[label_id]
433 if not label.only_if_needed:
434 # we don't care about non-only_if_needed labels
435 continue
436 if queue_entry.meta_host == label_id:
437 # if the label was requested in a metahost it's OK
438 continue
439 if label_id not in job_dependencies:
440 return False
441 return True
442
443
showard89f84db2009-03-12 20:39:13 +0000444 def _check_atomic_group_labels(self, host_labels, queue_entry):
445 """
446 Determine if the given HostQueueEntry's atomic group settings are okay
447 to schedule on a host with the given labels.
448
showard6157c632009-07-06 20:19:31 +0000449 @param host_labels: A list of label ids that the host has.
450 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000451
452 @returns True if atomic group settings are okay, False otherwise.
453 """
showard6157c632009-07-06 20:19:31 +0000454 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000455 queue_entry.atomic_group_id)
456
457
showard6157c632009-07-06 20:19:31 +0000458 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000459 """
460 Return the atomic group label id for a host with the given set of
461 labels if any, or None otherwise. Raises an exception if more than
462 one atomic group are found in the set of labels.
463
showard6157c632009-07-06 20:19:31 +0000464 @param host_labels: A list of label ids that the host has.
465 @param queue_entry: The HostQueueEntry we're testing. Only used for
466 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000467
468 @returns The id of the atomic group found on a label in host_labels
469 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000470 """
showard6157c632009-07-06 20:19:31 +0000471 atomic_labels = [self._labels[label_id] for label_id in host_labels
472 if self._labels[label_id].atomic_group_id is not None]
473 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000474 if not atomic_ids:
475 return None
476 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000477 logging.error('More than one Atomic Group on HQE "%s" via: %r',
478 queue_entry, atomic_labels)
479 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000480
481
482 def _get_atomic_group_labels(self, atomic_group_id):
483 """
484 Lookup the label ids that an atomic_group is associated with.
485
486 @param atomic_group_id - The id of the AtomicGroup to look up.
487
488 @returns A generator yeilding Label ids for this atomic group.
489 """
490 return (id for id, label in self._labels.iteritems()
491 if label.atomic_group_id == atomic_group_id
492 and not label.invalid)
493
494
showard54c1ea92009-05-20 00:32:58 +0000495 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000496 """
497 @param group_hosts - A sequence of Host ids to test for usability
498 and eligibility against the Job associated with queue_entry.
499 @param queue_entry - The HostQueueEntry that these hosts are being
500 tested for eligibility against.
501
502 @returns A subset of group_hosts Host ids that are eligible for the
503 supplied queue_entry.
504 """
505 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000506 if self.is_host_usable(host_id)
507 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000508
509
jamesren883492a2010-02-12 00:45:18 +0000510 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000511 if self._is_host_invalid(host_id):
512 # if an invalid host is scheduled for a job, it's a one-time host
513 # and it therefore bypasses eligibility checks. note this can only
514 # happen for non-metahosts, because invalid hosts have their label
515 # relationships cleared.
516 return True
517
showard989f25d2008-10-01 11:38:11 +0000518 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
519 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000520
showard89f84db2009-03-12 20:39:13 +0000521 return (self._is_acl_accessible(host_id, queue_entry) and
522 self._check_job_dependencies(job_dependencies, host_labels) and
523 self._check_only_if_needed_labels(
524 job_dependencies, host_labels, queue_entry) and
525 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000526
527
showard2924b0a2009-06-18 23:16:15 +0000528 def _is_host_invalid(self, host_id):
529 host_object = self._hosts_available.get(host_id, None)
530 return host_object and host_object.invalid
531
532
showard63a34772008-08-18 19:32:50 +0000533 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000534 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000535 return None
536 return self._hosts_available.pop(queue_entry.host_id, None)
537
538
jamesren883492a2010-02-12 00:45:18 +0000539 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000540 if host_id not in self._hosts_available:
541 # host was already used during this scheduling cycle
542 return False
543 if self._hosts_available[host_id].invalid:
544 # Invalid hosts cannot be used for metahosts. They're included in
545 # the original query because they can be used by non-metahosts.
546 return False
547 return True
548
549
jamesren883492a2010-02-12 00:45:18 +0000550 def schedule_entry(self, queue_entry):
551 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000552 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000553
554 for scheduler in self._metahost_schedulers:
555 if scheduler.can_schedule_metahost(queue_entry):
556 scheduler.schedule_metahost(queue_entry, self)
557 return None
558
559 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000560
561
showard89f84db2009-03-12 20:39:13 +0000562 def find_eligible_atomic_group(self, queue_entry):
563 """
564 Given an atomic group host queue entry, locate an appropriate group
565 of hosts for the associated job to run on.
566
567 The caller is responsible for creating new HQEs for the additional
568 hosts returned in order to run the actual job on them.
569
570 @returns A list of Host instances in a ready state to satisfy this
571 atomic group scheduling. Hosts will all belong to the same
572 atomic group label as specified by the queue_entry.
573 An empty list will be returned if no suitable atomic
574 group could be found.
575
576 TODO(gps): what is responsible for kicking off any attempted repairs on
577 a group of hosts? not this function, but something needs to. We do
578 not communicate that reason for returning [] outside of here...
579 For now, we'll just be unschedulable if enough hosts within one group
580 enter Repair Failed state.
581 """
582 assert queue_entry.atomic_group_id is not None
583 job = queue_entry.job
584 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000585 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000586 if job.synch_count > atomic_group.max_number_of_machines:
587 # Such a Job and HostQueueEntry should never be possible to
588 # create using the frontend. Regardless, we can't process it.
589 # Abort it immediately and log an error on the scheduler.
590 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000591 logging.error(
592 'Error: job %d synch_count=%d > requested atomic_group %d '
593 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
594 job.id, job.synch_count, atomic_group.id,
595 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000596 return []
jamesren883492a2010-02-12 00:45:18 +0000597 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
598 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000599
600 # Look in each label associated with atomic_group until we find one with
601 # enough hosts to satisfy the job.
602 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000603 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000604 if queue_entry.meta_host is not None:
605 # If we have a metahost label, only allow its hosts.
606 group_hosts.intersection_update(hosts_in_label)
607 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000608 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000609 group_hosts, queue_entry)
610
611 # Job.synch_count is treated as "minimum synch count" when
612 # scheduling for an atomic group of hosts. The atomic group
613 # number of machines is the maximum to pick out of a single
614 # atomic group label for scheduling at one time.
615 min_hosts = job.synch_count
616 max_hosts = atomic_group.max_number_of_machines
617
showard54c1ea92009-05-20 00:32:58 +0000618 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000619 # Not enough eligible hosts in this atomic group label.
620 continue
621
showard54c1ea92009-05-20 00:32:58 +0000622 eligible_hosts_in_group = [self._hosts_available[id]
623 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000624 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000625 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000626
showard89f84db2009-03-12 20:39:13 +0000627 # Limit ourselves to scheduling the atomic group size.
628 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000629 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000630
631 # Remove the selected hosts from our cached internal state
632 # of available hosts in order to return the Host objects.
633 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000634 for host in eligible_hosts_in_group:
635 hosts_in_label.discard(host.id)
636 self._hosts_available.pop(host.id)
637 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000638 return host_list
639
640 return []
641
642
showard170873e2009-01-07 00:22:26 +0000643class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000644 def __init__(self):
645 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000646 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000647 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000648 user_cleanup_time = scheduler_config.config.clean_interval
649 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
650 _db, user_cleanup_time)
651 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000652 self._host_agents = {}
653 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000654 self._tick_count = 0
655 self._last_garbage_stats_time = time.time()
656 self._seconds_between_garbage_stats = 60 * (
657 global_config.global_config.get_config_value(
658 scheduler_config.CONFIG_SECTION,
659 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000660
mbligh36768f02008-02-22 18:28:33 +0000661
showard915958d2009-04-22 21:00:58 +0000662 def initialize(self, recover_hosts=True):
663 self._periodic_cleanup.initialize()
664 self._24hr_upkeep.initialize()
665
jadmanski0afbb632008-06-06 21:10:57 +0000666 # always recover processes
667 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000668
jadmanski0afbb632008-06-06 21:10:57 +0000669 if recover_hosts:
670 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000671
jamesrenc44ae992010-02-19 00:12:54 +0000672 self._host_scheduler.recovery_on_startup()
673
mbligh36768f02008-02-22 18:28:33 +0000674
jadmanski0afbb632008-06-06 21:10:57 +0000675 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000676 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000677 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000678 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000679 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000680 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000681 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000682 self._schedule_running_host_queue_entries()
683 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000684 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000685 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000686 _drone_manager.execute_actions()
687 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000688 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000689 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000690
showard97aed502008-11-04 02:01:24 +0000691
mblighf3294cc2009-04-08 21:17:38 +0000692 def _run_cleanup(self):
693 self._periodic_cleanup.run_cleanup_maybe()
694 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000695
mbligh36768f02008-02-22 18:28:33 +0000696
showardf13a9e22009-12-18 22:54:09 +0000697 def _garbage_collection(self):
698 threshold_time = time.time() - self._seconds_between_garbage_stats
699 if threshold_time < self._last_garbage_stats_time:
700 # Don't generate these reports very often.
701 return
702
703 self._last_garbage_stats_time = time.time()
704 # Force a full level 0 collection (because we can, it doesn't hurt
705 # at this interval).
706 gc.collect()
707 logging.info('Logging garbage collector stats on tick %d.',
708 self._tick_count)
709 gc_stats._log_garbage_collector_stats()
710
711
showard170873e2009-01-07 00:22:26 +0000712 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
713 for object_id in object_ids:
714 agent_dict.setdefault(object_id, set()).add(agent)
715
716
717 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
718 for object_id in object_ids:
719 assert object_id in agent_dict
720 agent_dict[object_id].remove(agent)
721
722
showardd1195652009-12-08 22:21:02 +0000723 def add_agent_task(self, agent_task):
724 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000725 self._agents.append(agent)
726 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000727 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
728 self._register_agent_for_ids(self._queue_entry_agents,
729 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000730
showard170873e2009-01-07 00:22:26 +0000731
732 def get_agents_for_entry(self, queue_entry):
733 """
734 Find agents corresponding to the specified queue_entry.
735 """
showardd3dc1992009-04-22 21:01:40 +0000736 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000737
738
739 def host_has_agent(self, host):
740 """
741 Determine if there is currently an Agent present using this host.
742 """
743 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000744
745
jadmanski0afbb632008-06-06 21:10:57 +0000746 def remove_agent(self, agent):
747 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000748 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
749 agent)
750 self._unregister_agent_for_ids(self._queue_entry_agents,
751 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000752
753
showard8cc058f2009-09-08 16:26:33 +0000754 def _host_has_scheduled_special_task(self, host):
755 return bool(models.SpecialTask.objects.filter(host__id=host.id,
756 is_active=False,
757 is_complete=False))
758
759
jadmanski0afbb632008-06-06 21:10:57 +0000760 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000761 agent_tasks = self._create_recovery_agent_tasks()
762 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000763 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000764 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000765 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000766 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000767 self._reverify_remaining_hosts()
768 # reinitialize drones after killing orphaned processes, since they can
769 # leave around files when they die
770 _drone_manager.execute_actions()
771 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000772
showard170873e2009-01-07 00:22:26 +0000773
showardd1195652009-12-08 22:21:02 +0000774 def _create_recovery_agent_tasks(self):
775 return (self._get_queue_entry_agent_tasks()
776 + self._get_special_task_agent_tasks(is_active=True))
777
778
779 def _get_queue_entry_agent_tasks(self):
780 # host queue entry statuses handled directly by AgentTasks (Verifying is
781 # handled through SpecialTasks, so is not listed here)
782 statuses = (models.HostQueueEntry.Status.STARTING,
783 models.HostQueueEntry.Status.RUNNING,
784 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000785 models.HostQueueEntry.Status.PARSING,
786 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000787 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000788 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000789 where='status IN (%s)' % status_list)
790
791 agent_tasks = []
792 used_queue_entries = set()
793 for entry in queue_entries:
794 if self.get_agents_for_entry(entry):
795 # already being handled
796 continue
797 if entry in used_queue_entries:
798 # already picked up by a synchronous job
799 continue
800 agent_task = self._get_agent_task_for_queue_entry(entry)
801 agent_tasks.append(agent_task)
802 used_queue_entries.update(agent_task.queue_entries)
803 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000804
805
showardd1195652009-12-08 22:21:02 +0000806 def _get_special_task_agent_tasks(self, is_active=False):
807 special_tasks = models.SpecialTask.objects.filter(
808 is_active=is_active, is_complete=False)
809 return [self._get_agent_task_for_special_task(task)
810 for task in special_tasks]
811
812
813 def _get_agent_task_for_queue_entry(self, queue_entry):
814 """
815 Construct an AgentTask instance for the given active HostQueueEntry,
816 if one can currently run it.
817 @param queue_entry: a HostQueueEntry
818 @returns an AgentTask to run the queue entry
819 """
820 task_entries = queue_entry.job.get_group_entries(queue_entry)
821 self._check_for_duplicate_host_entries(task_entries)
822
823 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
824 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000825 if queue_entry.is_hostless():
826 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000827 return QueueTask(queue_entries=task_entries)
828 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
829 return GatherLogsTask(queue_entries=task_entries)
830 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
831 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000832 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
833 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000834
835 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000836 'invalid status %s: %s'
837 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000838
839
840 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000841 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
842 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000843 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000844 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000845 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000846 if using_host:
showardd1195652009-12-08 22:21:02 +0000847 self._assert_host_has_no_agent(task_entry)
848
849
850 def _assert_host_has_no_agent(self, entry):
851 """
852 @param entry: a HostQueueEntry or a SpecialTask
853 """
854 if self.host_has_agent(entry.host):
855 agent = tuple(self._host_agents.get(entry.host.id))[0]
856 raise SchedulerError(
857 'While scheduling %s, host %s already has a host agent %s'
858 % (entry, entry.host, agent.task))
859
860
861 def _get_agent_task_for_special_task(self, special_task):
862 """
863 Construct an AgentTask class to run the given SpecialTask and add it
864 to this dispatcher.
865 @param special_task: a models.SpecialTask instance
866 @returns an AgentTask to run this SpecialTask
867 """
868 self._assert_host_has_no_agent(special_task)
869
870 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
871 for agent_task_class in special_agent_task_classes:
872 if agent_task_class.TASK_TYPE == special_task.task:
873 return agent_task_class(task=special_task)
874
875 raise SchedulerError('No AgentTask class for task', str(special_task))
876
877
878 def _register_pidfiles(self, agent_tasks):
879 for agent_task in agent_tasks:
880 agent_task.register_necessary_pidfiles()
881
882
883 def _recover_tasks(self, agent_tasks):
884 orphans = _drone_manager.get_orphaned_autoserv_processes()
885
886 for agent_task in agent_tasks:
887 agent_task.recover()
888 if agent_task.monitor and agent_task.monitor.has_process():
889 orphans.discard(agent_task.monitor.get_process())
890 self.add_agent_task(agent_task)
891
892 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000893
894
showard8cc058f2009-09-08 16:26:33 +0000895 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000896 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
897 % status):
showard0db3d432009-10-12 20:29:15 +0000898 if entry.status == status and not self.get_agents_for_entry(entry):
899 # The status can change during iteration, e.g., if job.run()
900 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000901 yield entry
902
903
showard6878e8b2009-07-20 22:37:45 +0000904 def _check_for_remaining_orphan_processes(self, orphans):
905 if not orphans:
906 return
907 subject = 'Unrecovered orphan autoserv processes remain'
908 message = '\n'.join(str(process) for process in orphans)
909 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000910
911 die_on_orphans = global_config.global_config.get_config_value(
912 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
913
914 if die_on_orphans:
915 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000916
showard170873e2009-01-07 00:22:26 +0000917
showard8cc058f2009-09-08 16:26:33 +0000918 def _recover_pending_entries(self):
919 for entry in self._get_unassigned_entries(
920 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000921 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000922 entry.on_pending()
923
924
showardb8900452009-10-12 20:31:01 +0000925 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000926 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000927 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
928 unrecovered_hqes = []
929 for queue_entry in queue_entries:
930 special_tasks = models.SpecialTask.objects.filter(
931 task__in=(models.SpecialTask.Task.CLEANUP,
932 models.SpecialTask.Task.VERIFY),
933 queue_entry__id=queue_entry.id,
934 is_complete=False)
935 if special_tasks.count() == 0:
936 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000937
showardb8900452009-10-12 20:31:01 +0000938 if unrecovered_hqes:
939 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000940 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000941 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000942 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000943
944
showard65db3932009-10-28 19:54:35 +0000945 def _get_prioritized_special_tasks(self):
946 """
947 Returns all queued SpecialTasks prioritized for repair first, then
948 cleanup, then verify.
949 """
950 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
951 is_complete=False,
952 host__locked=False)
953 # exclude hosts with active queue entries unless the SpecialTask is for
954 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000955 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000956 queued_tasks, 'afe_host_queue_entries', 'host_id',
957 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000958 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000959 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000960 where=['(afe_host_queue_entries.id IS NULL OR '
961 'afe_host_queue_entries.id = '
962 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000963
showard65db3932009-10-28 19:54:35 +0000964 # reorder tasks by priority
965 task_priority_order = [models.SpecialTask.Task.REPAIR,
966 models.SpecialTask.Task.CLEANUP,
967 models.SpecialTask.Task.VERIFY]
968 def task_priority_key(task):
969 return task_priority_order.index(task.task)
970 return sorted(queued_tasks, key=task_priority_key)
971
972
showard65db3932009-10-28 19:54:35 +0000973 def _schedule_special_tasks(self):
974 """
975 Execute queued SpecialTasks that are ready to run on idle hosts.
976 """
977 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000978 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000979 continue
showardd1195652009-12-08 22:21:02 +0000980 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000981
982
showard170873e2009-01-07 00:22:26 +0000983 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000984 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000985 # should never happen
showarded2afea2009-07-07 20:54:07 +0000986 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000987 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000988 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000989 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000990 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000991
992
jadmanski0afbb632008-06-06 21:10:57 +0000993 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000994 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000995 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000996 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000997 if self.host_has_agent(host):
998 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000999 continue
showard8cc058f2009-09-08 16:26:33 +00001000 if self._host_has_scheduled_special_task(host):
1001 # host will have a special task scheduled on the next cycle
1002 continue
showard170873e2009-01-07 00:22:26 +00001003 if print_message:
showardb18134f2009-03-20 20:52:18 +00001004 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001005 models.SpecialTask.objects.create(
1006 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001007 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001008
1009
jadmanski0afbb632008-06-06 21:10:57 +00001010 def _recover_hosts(self):
1011 # recover "Repair Failed" hosts
1012 message = 'Reverifying dead host %s'
1013 self._reverify_hosts_where("status = 'Repair Failed'",
1014 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001015
1016
showard04c82c52008-05-29 19:38:12 +00001017
showardb95b1bd2008-08-15 18:11:04 +00001018 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001019 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001020 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001021 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001022 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001023 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001024
1025
showard89f84db2009-03-12 20:39:13 +00001026 def _refresh_pending_queue_entries(self):
1027 """
1028 Lookup the pending HostQueueEntries and call our HostScheduler
1029 refresh() method given that list. Return the list.
1030
1031 @returns A list of pending HostQueueEntries sorted in priority order.
1032 """
showard63a34772008-08-18 19:32:50 +00001033 queue_entries = self._get_pending_queue_entries()
1034 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001035 return []
showardb95b1bd2008-08-15 18:11:04 +00001036
showard63a34772008-08-18 19:32:50 +00001037 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001038
showard89f84db2009-03-12 20:39:13 +00001039 return queue_entries
1040
1041
1042 def _schedule_atomic_group(self, queue_entry):
1043 """
1044 Schedule the given queue_entry on an atomic group of hosts.
1045
1046 Returns immediately if there are insufficient available hosts.
1047
1048 Creates new HostQueueEntries based off of queue_entry for the
1049 scheduled hosts and starts them all running.
1050 """
1051 # This is a virtual host queue entry representing an entire
1052 # atomic group, find a group and schedule their hosts.
1053 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1054 queue_entry)
1055 if not group_hosts:
1056 return
showardcbe6f942009-06-17 19:33:49 +00001057
1058 logging.info('Expanding atomic group entry %s with hosts %s',
1059 queue_entry,
1060 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001061
showard89f84db2009-03-12 20:39:13 +00001062 for assigned_host in group_hosts[1:]:
1063 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001064 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001065 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001066 new_hqe.set_host(assigned_host)
1067 self._run_queue_entry(new_hqe)
1068
1069 # The first assigned host uses the original HostQueueEntry
1070 queue_entry.set_host(group_hosts[0])
1071 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001072
1073
showarda9545c02009-12-18 22:44:26 +00001074 def _schedule_hostless_job(self, queue_entry):
1075 self.add_agent_task(HostlessQueueTask(queue_entry))
1076
1077
showard89f84db2009-03-12 20:39:13 +00001078 def _schedule_new_jobs(self):
1079 queue_entries = self._refresh_pending_queue_entries()
1080 if not queue_entries:
1081 return
1082
showard63a34772008-08-18 19:32:50 +00001083 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001084 is_unassigned_atomic_group = (
1085 queue_entry.atomic_group_id is not None
1086 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001087
1088 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001089 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001090 elif is_unassigned_atomic_group:
1091 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001092 else:
jamesren883492a2010-02-12 00:45:18 +00001093 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001094 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001095 assert assigned_host.id == queue_entry.host_id
1096 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001097
1098
showard8cc058f2009-09-08 16:26:33 +00001099 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001100 for agent_task in self._get_queue_entry_agent_tasks():
1101 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001102
1103
1104 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001105 for entry in scheduler_models.HostQueueEntry.fetch(
1106 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001107 task = entry.job.schedule_delayed_callback_task(entry)
1108 if task:
showardd1195652009-12-08 22:21:02 +00001109 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001110
1111
jamesren883492a2010-02-12 00:45:18 +00001112 def _run_queue_entry(self, queue_entry):
1113 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001114
1115
jadmanski0afbb632008-06-06 21:10:57 +00001116 def _find_aborting(self):
jamesrenc44ae992010-02-19 00:12:54 +00001117 for entry in scheduler_models.HostQueueEntry.fetch(
1118 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001119 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001120 for agent in self.get_agents_for_entry(entry):
1121 agent.abort()
1122 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001123
1124
showard324bf812009-01-20 23:23:38 +00001125 def _can_start_agent(self, agent, num_started_this_cycle,
1126 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001127 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001128 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001129 return True
1130 # don't allow any nonzero-process agents to run after we've reached a
1131 # limit (this avoids starvation of many-process agents)
1132 if have_reached_limit:
1133 return False
1134 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001135 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001136 agent.task.owner_username)
1137 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001138 return False
1139 # if a single agent exceeds the per-cycle throttling, still allow it to
1140 # run when it's the first agent in the cycle
1141 if num_started_this_cycle == 0:
1142 return True
1143 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001144 if (num_started_this_cycle + agent.task.num_processes >
1145 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001146 return False
1147 return True
1148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001151 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001152 have_reached_limit = False
1153 # iterate over copy, so we can remove agents during iteration
1154 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001155 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001156 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001157 have_reached_limit):
1158 have_reached_limit = True
1159 continue
showardd1195652009-12-08 22:21:02 +00001160 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001161 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001162 if agent.is_done():
1163 logging.info("agent finished")
1164 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001165 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001166 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001167
1168
showard29f7cd22009-04-29 21:16:24 +00001169 def _process_recurring_runs(self):
1170 recurring_runs = models.RecurringRun.objects.filter(
1171 start_date__lte=datetime.datetime.now())
1172 for rrun in recurring_runs:
1173 # Create job from template
1174 job = rrun.job
1175 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001176 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001177
1178 host_objects = info['hosts']
1179 one_time_hosts = info['one_time_hosts']
1180 metahost_objects = info['meta_hosts']
1181 dependencies = info['dependencies']
1182 atomic_group = info['atomic_group']
1183
1184 for host in one_time_hosts or []:
1185 this_host = models.Host.create_one_time_host(host.hostname)
1186 host_objects.append(this_host)
1187
1188 try:
1189 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001190 options=options,
showard29f7cd22009-04-29 21:16:24 +00001191 host_objects=host_objects,
1192 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001193 atomic_group=atomic_group)
1194
1195 except Exception, ex:
1196 logging.exception(ex)
1197 #TODO send email
1198
1199 if rrun.loop_count == 1:
1200 rrun.delete()
1201 else:
1202 if rrun.loop_count != 0: # if not infinite loop
1203 # calculate new start_date
1204 difference = datetime.timedelta(seconds=rrun.loop_period)
1205 rrun.start_date = rrun.start_date + difference
1206 rrun.loop_count -= 1
1207 rrun.save()
1208
1209
showard170873e2009-01-07 00:22:26 +00001210class PidfileRunMonitor(object):
1211 """
1212 Client must call either run() to start a new process or
1213 attach_to_existing_process().
1214 """
mbligh36768f02008-02-22 18:28:33 +00001215
showard170873e2009-01-07 00:22:26 +00001216 class _PidfileException(Exception):
1217 """
1218 Raised when there's some unexpected behavior with the pid file, but only
1219 used internally (never allowed to escape this class).
1220 """
mbligh36768f02008-02-22 18:28:33 +00001221
1222
showard170873e2009-01-07 00:22:26 +00001223 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001224 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001225 self._start_time = None
1226 self.pidfile_id = None
1227 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001228
1229
showard170873e2009-01-07 00:22:26 +00001230 def _add_nice_command(self, command, nice_level):
1231 if not nice_level:
1232 return command
1233 return ['nice', '-n', str(nice_level)] + command
1234
1235
1236 def _set_start_time(self):
1237 self._start_time = time.time()
1238
1239
showard418785b2009-11-23 20:19:59 +00001240 def run(self, command, working_directory, num_processes, nice_level=None,
1241 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1242 username=None):
showard170873e2009-01-07 00:22:26 +00001243 assert command is not None
1244 if nice_level is not None:
1245 command = ['nice', '-n', str(nice_level)] + command
1246 self._set_start_time()
1247 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001248 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001249 num_processes=num_processes, log_file=log_file,
1250 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001251
1252
showarded2afea2009-07-07 20:54:07 +00001253 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001254 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001255 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001256 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001257 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001258 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001259 if num_processes is not None:
1260 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001261
1262
jadmanski0afbb632008-06-06 21:10:57 +00001263 def kill(self):
showard170873e2009-01-07 00:22:26 +00001264 if self.has_process():
1265 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001266
mbligh36768f02008-02-22 18:28:33 +00001267
showard170873e2009-01-07 00:22:26 +00001268 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001269 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001270 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001271
1272
showard170873e2009-01-07 00:22:26 +00001273 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001274 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001275 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001276 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001277
1278
showard170873e2009-01-07 00:22:26 +00001279 def _read_pidfile(self, use_second_read=False):
1280 assert self.pidfile_id is not None, (
1281 'You must call run() or attach_to_existing_process()')
1282 contents = _drone_manager.get_pidfile_contents(
1283 self.pidfile_id, use_second_read=use_second_read)
1284 if contents.is_invalid():
1285 self._state = drone_manager.PidfileContents()
1286 raise self._PidfileException(contents)
1287 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001288
1289
showard21baa452008-10-21 00:08:39 +00001290 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001291 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1292 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001293 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001294 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001295
1296
1297 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001298 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001299 return
mblighbb421852008-03-11 22:36:16 +00001300
showard21baa452008-10-21 00:08:39 +00001301 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001302
showard170873e2009-01-07 00:22:26 +00001303 if self._state.process is None:
1304 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001305 return
mbligh90a549d2008-03-25 23:52:34 +00001306
showard21baa452008-10-21 00:08:39 +00001307 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001308 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001309 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001310 return
mbligh90a549d2008-03-25 23:52:34 +00001311
showard170873e2009-01-07 00:22:26 +00001312 # pid but no running process - maybe process *just* exited
1313 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001314 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001315 # autoserv exited without writing an exit code
1316 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001317 self._handle_pidfile_error(
1318 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001319
showard21baa452008-10-21 00:08:39 +00001320
1321 def _get_pidfile_info(self):
1322 """\
1323 After completion, self._state will contain:
1324 pid=None, exit_status=None if autoserv has not yet run
1325 pid!=None, exit_status=None if autoserv is running
1326 pid!=None, exit_status!=None if autoserv has completed
1327 """
1328 try:
1329 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001330 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001331 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001332
1333
showard170873e2009-01-07 00:22:26 +00001334 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001335 """\
1336 Called when no pidfile is found or no pid is in the pidfile.
1337 """
showard170873e2009-01-07 00:22:26 +00001338 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001339 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001340 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001341 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001342 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001343
1344
showard35162b02009-03-03 02:17:30 +00001345 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001346 """\
1347 Called when autoserv has exited without writing an exit status,
1348 or we've timed out waiting for autoserv to write a pid to the
1349 pidfile. In either case, we just return failure and the caller
1350 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001351
showard170873e2009-01-07 00:22:26 +00001352 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001353 """
1354 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001355 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001356 self._state.exit_status = 1
1357 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001358
1359
jadmanski0afbb632008-06-06 21:10:57 +00001360 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001361 self._get_pidfile_info()
1362 return self._state.exit_status
1363
1364
1365 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001366 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001367 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001368 if self._state.num_tests_failed is None:
1369 return -1
showard21baa452008-10-21 00:08:39 +00001370 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001371
1372
showardcdaeae82009-08-31 18:32:48 +00001373 def try_copy_results_on_drone(self, **kwargs):
1374 if self.has_process():
1375 # copy results logs into the normal place for job results
1376 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1377
1378
1379 def try_copy_to_results_repository(self, source, **kwargs):
1380 if self.has_process():
1381 _drone_manager.copy_to_results_repository(self.get_process(),
1382 source, **kwargs)
1383
1384
mbligh36768f02008-02-22 18:28:33 +00001385class Agent(object):
showard77182562009-06-10 00:16:05 +00001386 """
showard8cc058f2009-09-08 16:26:33 +00001387 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001388
1389 The following methods are required on all task objects:
1390 poll() - Called periodically to let the task check its status and
1391 update its internal state. If the task succeeded.
1392 is_done() - Returns True if the task is finished.
1393 abort() - Called when an abort has been requested. The task must
1394 set its aborted attribute to True if it actually aborted.
1395
1396 The following attributes are required on all task objects:
1397 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001398 success - bool, True if this task succeeded.
1399 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1400 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001401 """
1402
1403
showard418785b2009-11-23 20:19:59 +00001404 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001405 """
showard8cc058f2009-09-08 16:26:33 +00001406 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001407 """
showard8cc058f2009-09-08 16:26:33 +00001408 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001409
showard77182562009-06-10 00:16:05 +00001410 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001411 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001412
showard8cc058f2009-09-08 16:26:33 +00001413 self.queue_entry_ids = task.queue_entry_ids
1414 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001415
showard8cc058f2009-09-08 16:26:33 +00001416 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001417 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001421 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001422 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001423 self.task.poll()
1424 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001425 self.finished = True
showardec113162008-05-08 00:52:49 +00001426
1427
jadmanski0afbb632008-06-06 21:10:57 +00001428 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001429 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001430
1431
showardd3dc1992009-04-22 21:01:40 +00001432 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001433 if self.task:
1434 self.task.abort()
1435 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001436 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001437 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001438
showardd3dc1992009-04-22 21:01:40 +00001439
mbligh36768f02008-02-22 18:28:33 +00001440class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001441 class _NullMonitor(object):
1442 pidfile_id = None
1443
1444 def has_process(self):
1445 return True
1446
1447
1448 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001449 """
showardd1195652009-12-08 22:21:02 +00001450 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001451 """
jadmanski0afbb632008-06-06 21:10:57 +00001452 self.done = False
showardd1195652009-12-08 22:21:02 +00001453 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001454 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001455 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001456 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001457 self.queue_entry_ids = []
1458 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001459 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001460
1461
1462 def _set_ids(self, host=None, queue_entries=None):
1463 if queue_entries and queue_entries != [None]:
1464 self.host_ids = [entry.host.id for entry in queue_entries]
1465 self.queue_entry_ids = [entry.id for entry in queue_entries]
1466 else:
1467 assert host
1468 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001469
1470
jadmanski0afbb632008-06-06 21:10:57 +00001471 def poll(self):
showard08a36412009-05-05 01:01:13 +00001472 if not self.started:
1473 self.start()
showardd1195652009-12-08 22:21:02 +00001474 if not self.done:
1475 self.tick()
showard08a36412009-05-05 01:01:13 +00001476
1477
1478 def tick(self):
showardd1195652009-12-08 22:21:02 +00001479 assert self.monitor
1480 exit_code = self.monitor.exit_code()
1481 if exit_code is None:
1482 return
mbligh36768f02008-02-22 18:28:33 +00001483
showardd1195652009-12-08 22:21:02 +00001484 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001485 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001486
1487
jadmanski0afbb632008-06-06 21:10:57 +00001488 def is_done(self):
1489 return self.done
mbligh36768f02008-02-22 18:28:33 +00001490
1491
jadmanski0afbb632008-06-06 21:10:57 +00001492 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001493 if self.done:
showardd1195652009-12-08 22:21:02 +00001494 assert self.started
showard08a36412009-05-05 01:01:13 +00001495 return
showardd1195652009-12-08 22:21:02 +00001496 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001497 self.done = True
1498 self.success = success
1499 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001500
1501
jadmanski0afbb632008-06-06 21:10:57 +00001502 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001503 """
1504 To be overridden.
1505 """
showarded2afea2009-07-07 20:54:07 +00001506 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001507 self.register_necessary_pidfiles()
1508
1509
1510 def _log_file(self):
1511 if not self._log_file_name:
1512 return None
1513 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001514
mbligh36768f02008-02-22 18:28:33 +00001515
jadmanski0afbb632008-06-06 21:10:57 +00001516 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001517 log_file = self._log_file()
1518 if self.monitor and log_file:
1519 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001520
1521
jadmanski0afbb632008-06-06 21:10:57 +00001522 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001523 """
1524 To be overridden.
1525 """
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001527 logging.info("%s finished with success=%s", type(self).__name__,
1528 self.success)
1529
mbligh36768f02008-02-22 18:28:33 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001533 if not self.started:
1534 self.prolog()
1535 self.run()
1536
1537 self.started = True
1538
1539
1540 def abort(self):
1541 if self.monitor:
1542 self.monitor.kill()
1543 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001544 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001545 self.cleanup()
1546
1547
showarded2afea2009-07-07 20:54:07 +00001548 def _get_consistent_execution_path(self, execution_entries):
1549 first_execution_path = execution_entries[0].execution_path()
1550 for execution_entry in execution_entries[1:]:
1551 assert execution_entry.execution_path() == first_execution_path, (
1552 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1553 execution_entry,
1554 first_execution_path,
1555 execution_entries[0]))
1556 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001557
1558
showarded2afea2009-07-07 20:54:07 +00001559 def _copy_results(self, execution_entries, use_monitor=None):
1560 """
1561 @param execution_entries: list of objects with execution_path() method
1562 """
showard6d1c1432009-08-20 23:30:39 +00001563 if use_monitor is not None and not use_monitor.has_process():
1564 return
1565
showarded2afea2009-07-07 20:54:07 +00001566 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001567 if use_monitor is None:
1568 assert self.monitor
1569 use_monitor = self.monitor
1570 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001571 execution_path = self._get_consistent_execution_path(execution_entries)
1572 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001573 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001574
showarda1e74b32009-05-12 17:32:04 +00001575
1576 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001577 for queue_entry in queue_entries:
1578 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001579
1580
mbligh4608b002010-01-05 18:22:35 +00001581 def _archive_results(self, queue_entries):
1582 for queue_entry in queue_entries:
1583 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001584
1585
showardd1195652009-12-08 22:21:02 +00001586 def _command_line(self):
1587 """
1588 Return the command line to run. Must be overridden.
1589 """
1590 raise NotImplementedError
1591
1592
1593 @property
1594 def num_processes(self):
1595 """
1596 Return the number of processes forked by this AgentTask's process. It
1597 may only be approximate. To be overridden if necessary.
1598 """
1599 return 1
1600
1601
1602 def _paired_with_monitor(self):
1603 """
1604 If this AgentTask's process must run on the same machine as some
1605 previous process, this method should be overridden to return a
1606 PidfileRunMonitor for that process.
1607 """
1608 return self._NullMonitor()
1609
1610
1611 @property
1612 def owner_username(self):
1613 """
1614 Return login of user responsible for this task. May be None. Must be
1615 overridden.
1616 """
1617 raise NotImplementedError
1618
1619
1620 def _working_directory(self):
1621 """
1622 Return the directory where this AgentTask's process executes. Must be
1623 overridden.
1624 """
1625 raise NotImplementedError
1626
1627
1628 def _pidfile_name(self):
1629 """
1630 Return the name of the pidfile this AgentTask's process uses. To be
1631 overridden if necessary.
1632 """
jamesrenc44ae992010-02-19 00:12:54 +00001633 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001634
1635
1636 def _check_paired_results_exist(self):
1637 if not self._paired_with_monitor().has_process():
1638 email_manager.manager.enqueue_notify_email(
1639 'No paired results in task',
1640 'No paired results in task %s at %s'
1641 % (self, self._paired_with_monitor().pidfile_id))
1642 self.finished(False)
1643 return False
1644 return True
1645
1646
1647 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001648 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001649 self.monitor = PidfileRunMonitor()
1650
1651
1652 def run(self):
1653 if not self._check_paired_results_exist():
1654 return
1655
1656 self._create_monitor()
1657 self.monitor.run(
1658 self._command_line(), self._working_directory(),
1659 num_processes=self.num_processes,
1660 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1661 pidfile_name=self._pidfile_name(),
1662 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1663 username=self.owner_username)
1664
1665
1666 def register_necessary_pidfiles(self):
1667 pidfile_id = _drone_manager.get_pidfile_id_from(
1668 self._working_directory(), self._pidfile_name())
1669 _drone_manager.register_pidfile(pidfile_id)
1670
1671 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1672 if paired_pidfile_id:
1673 _drone_manager.register_pidfile(paired_pidfile_id)
1674
1675
1676 def recover(self):
1677 if not self._check_paired_results_exist():
1678 return
1679
1680 self._create_monitor()
1681 self.monitor.attach_to_existing_process(
1682 self._working_directory(), pidfile_name=self._pidfile_name(),
1683 num_processes=self.num_processes)
1684 if not self.monitor.has_process():
1685 # no process to recover; wait to be started normally
1686 self.monitor = None
1687 return
1688
1689 self.started = True
1690 logging.info('Recovering process %s for %s at %s'
1691 % (self.monitor.get_process(), type(self).__name__,
1692 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001693
1694
mbligh4608b002010-01-05 18:22:35 +00001695 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1696 allowed_host_statuses=None):
1697 for entry in queue_entries:
1698 if entry.status not in allowed_hqe_statuses:
1699 raise SchedulerError('Queue task attempting to start '
1700 'entry with invalid status %s: %s'
1701 % (entry.status, entry))
1702 invalid_host_status = (
1703 allowed_host_statuses is not None
1704 and entry.host.status not in allowed_host_statuses)
1705 if invalid_host_status:
1706 raise SchedulerError('Queue task attempting to start on queue '
1707 'entry with invalid host status %s: %s'
1708 % (entry.host.status, entry))
1709
1710
showardd9205182009-04-27 20:09:55 +00001711class TaskWithJobKeyvals(object):
1712 """AgentTask mixin providing functionality to help with job keyval files."""
1713 _KEYVAL_FILE = 'keyval'
1714 def _format_keyval(self, key, value):
1715 return '%s=%s' % (key, value)
1716
1717
1718 def _keyval_path(self):
1719 """Subclasses must override this"""
1720 raise NotImplemented
1721
1722
1723 def _write_keyval_after_job(self, field, value):
1724 assert self.monitor
1725 if not self.monitor.has_process():
1726 return
1727 _drone_manager.write_lines_to_file(
1728 self._keyval_path(), [self._format_keyval(field, value)],
1729 paired_with_process=self.monitor.get_process())
1730
1731
1732 def _job_queued_keyval(self, job):
1733 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1734
1735
1736 def _write_job_finished(self):
1737 self._write_keyval_after_job("job_finished", int(time.time()))
1738
1739
showarddb502762009-09-09 15:31:20 +00001740 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1741 keyval_contents = '\n'.join(self._format_keyval(key, value)
1742 for key, value in keyval_dict.iteritems())
1743 # always end with a newline to allow additional keyvals to be written
1744 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001745 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001746 keyval_contents,
1747 file_path=keyval_path)
1748
1749
1750 def _write_keyvals_before_job(self, keyval_dict):
1751 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1752
1753
1754 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001755 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001756 host.hostname)
1757 platform, all_labels = host.platform_and_labels()
1758 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1759 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1760
1761
showard8cc058f2009-09-08 16:26:33 +00001762class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001763 """
1764 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1765 """
1766
1767 TASK_TYPE = None
1768 host = None
1769 queue_entry = None
1770
showardd1195652009-12-08 22:21:02 +00001771 def __init__(self, task, extra_command_args):
1772 super(SpecialAgentTask, self).__init__()
1773
showarded2afea2009-07-07 20:54:07 +00001774 assert (self.TASK_TYPE is not None,
1775 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001776
jamesrenc44ae992010-02-19 00:12:54 +00001777 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001778 self.queue_entry = None
1779 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001780 self.queue_entry = scheduler_models.HostQueueEntry(
1781 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001782
showarded2afea2009-07-07 20:54:07 +00001783 self.task = task
1784 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001785
1786
showard8cc058f2009-09-08 16:26:33 +00001787 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001788 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1789
1790
1791 def _command_line(self):
1792 return _autoserv_command_line(self.host.hostname,
1793 self._extra_command_args,
1794 queue_entry=self.queue_entry)
1795
1796
1797 def _working_directory(self):
1798 return self.task.execution_path()
1799
1800
1801 @property
1802 def owner_username(self):
1803 if self.task.requested_by:
1804 return self.task.requested_by.login
1805 return None
showard8cc058f2009-09-08 16:26:33 +00001806
1807
showarded2afea2009-07-07 20:54:07 +00001808 def prolog(self):
1809 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001810 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001811 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001812
1813
showardde634ee2009-01-30 01:44:24 +00001814 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001815 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001816
showard2fe3f1d2009-07-06 20:19:11 +00001817 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001818 return # don't fail metahost entries, they'll be reassigned
1819
showard2fe3f1d2009-07-06 20:19:11 +00001820 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001821 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001822 return # entry has been aborted
1823
showard2fe3f1d2009-07-06 20:19:11 +00001824 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001825 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001826 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001827 self._write_keyval_after_job(queued_key, queued_time)
1828 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001829
showard8cc058f2009-09-08 16:26:33 +00001830 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001831 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001832 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001833 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001834
showard8cc058f2009-09-08 16:26:33 +00001835 pidfile_id = _drone_manager.get_pidfile_id_from(
1836 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001837 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001838 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001839
1840 if self.queue_entry.job.parse_failed_repair:
1841 self._parse_results([self.queue_entry])
1842 else:
1843 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001844
1845
1846 def cleanup(self):
1847 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001848
1849 # We will consider an aborted task to be "Failed"
1850 self.task.finish(bool(self.success))
1851
showardf85a0b72009-10-07 20:48:45 +00001852 if self.monitor:
1853 if self.monitor.has_process():
1854 self._copy_results([self.task])
1855 if self.monitor.pidfile_id is not None:
1856 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001857
1858
1859class RepairTask(SpecialAgentTask):
1860 TASK_TYPE = models.SpecialTask.Task.REPAIR
1861
1862
showardd1195652009-12-08 22:21:02 +00001863 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001864 """\
1865 queue_entry: queue entry to mark failed if this repair fails.
1866 """
1867 protection = host_protections.Protection.get_string(
1868 task.host.protection)
1869 # normalize the protection name
1870 protection = host_protections.Protection.get_attr_name(protection)
1871
1872 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001873 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001874
1875 # *don't* include the queue entry in IDs -- if the queue entry is
1876 # aborted, we want to leave the repair task running
1877 self._set_ids(host=self.host)
1878
1879
1880 def prolog(self):
1881 super(RepairTask, self).prolog()
1882 logging.info("repair_task starting")
1883 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001884
1885
jadmanski0afbb632008-06-06 21:10:57 +00001886 def epilog(self):
1887 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001888
jadmanski0afbb632008-06-06 21:10:57 +00001889 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001890 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001891 else:
showard8cc058f2009-09-08 16:26:33 +00001892 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001893 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001894 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001895
1896
showarded2afea2009-07-07 20:54:07 +00001897class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001898 def _copy_to_results_repository(self):
1899 if not self.queue_entry or self.queue_entry.meta_host:
1900 return
1901
1902 self.queue_entry.set_execution_subdir()
1903 log_name = os.path.basename(self.task.execution_path())
1904 source = os.path.join(self.task.execution_path(), 'debug',
1905 'autoserv.DEBUG')
1906 destination = os.path.join(
1907 self.queue_entry.execution_path(), log_name)
1908
1909 self.monitor.try_copy_to_results_repository(
1910 source, destination_path=destination)
1911
1912
showard170873e2009-01-07 00:22:26 +00001913 def epilog(self):
1914 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001915
showard775300b2009-09-09 15:30:50 +00001916 if self.success:
1917 return
showard8fe93b52008-11-18 17:53:22 +00001918
showard775300b2009-09-09 15:30:50 +00001919 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001920
showard775300b2009-09-09 15:30:50 +00001921 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001922 # effectively ignore failure for these hosts
1923 self.success = True
showard775300b2009-09-09 15:30:50 +00001924 return
1925
1926 if self.queue_entry:
1927 self.queue_entry.requeue()
1928
1929 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001930 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001931 queue_entry__id=self.queue_entry.id):
1932 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1933 self._fail_queue_entry()
1934 return
1935
showard9bb960b2009-11-19 01:02:11 +00001936 queue_entry = models.HostQueueEntry.objects.get(
1937 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001938 else:
1939 queue_entry = None
1940
1941 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001942 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001943 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001944 queue_entry=queue_entry,
1945 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001946
showard8fe93b52008-11-18 17:53:22 +00001947
1948class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001949 TASK_TYPE = models.SpecialTask.Task.VERIFY
1950
1951
showardd1195652009-12-08 22:21:02 +00001952 def __init__(self, task):
1953 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001954 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001955
1956
jadmanski0afbb632008-06-06 21:10:57 +00001957 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001958 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001959
showardb18134f2009-03-20 20:52:18 +00001960 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001961 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001962 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1963 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001964
showarded2afea2009-07-07 20:54:07 +00001965 # Delete any other queued verifies for this host. One verify will do
1966 # and there's no need to keep records of other requests.
1967 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001968 host__id=self.host.id,
1969 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001970 is_active=False, is_complete=False)
1971 queued_verifies = queued_verifies.exclude(id=self.task.id)
1972 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001973
mbligh36768f02008-02-22 18:28:33 +00001974
jadmanski0afbb632008-06-06 21:10:57 +00001975 def epilog(self):
1976 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001977 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001978 if self.queue_entry:
1979 self.queue_entry.on_pending()
1980 else:
1981 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001982
1983
mbligh4608b002010-01-05 18:22:35 +00001984class CleanupTask(PreJobTask):
1985 # note this can also run post-job, but when it does, it's running standalone
1986 # against the host (not related to the job), so it's not considered a
1987 # PostJobTask
1988
1989 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1990
1991
1992 def __init__(self, task, recover_run_monitor=None):
1993 super(CleanupTask, self).__init__(task, ['--cleanup'])
1994 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1995
1996
1997 def prolog(self):
1998 super(CleanupTask, self).prolog()
1999 logging.info("starting cleanup task for host: %s", self.host.hostname)
2000 self.host.set_status(models.Host.Status.CLEANING)
2001 if self.queue_entry:
2002 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2003
2004
2005 def _finish_epilog(self):
2006 if not self.queue_entry or not self.success:
2007 return
2008
2009 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2010 should_run_verify = (
2011 self.queue_entry.job.run_verify
2012 and self.host.protection != do_not_verify_protection)
2013 if should_run_verify:
2014 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2015 models.SpecialTask.objects.create(
2016 host=models.Host.objects.get(id=self.host.id),
2017 queue_entry=entry,
2018 task=models.SpecialTask.Task.VERIFY)
2019 else:
2020 self.queue_entry.on_pending()
2021
2022
2023 def epilog(self):
2024 super(CleanupTask, self).epilog()
2025
2026 if self.success:
2027 self.host.update_field('dirty', 0)
2028 self.host.set_status(models.Host.Status.READY)
2029
2030 self._finish_epilog()
2031
2032
showarda9545c02009-12-18 22:44:26 +00002033class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2034 """
2035 Common functionality for QueueTask and HostlessQueueTask
2036 """
2037 def __init__(self, queue_entries):
2038 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002039 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002040 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002041
2042
showard73ec0442009-02-07 02:05:20 +00002043 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002044 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002045
2046
jamesrenc44ae992010-02-19 00:12:54 +00002047 def _write_control_file(self, execution_path):
2048 control_path = _drone_manager.attach_file_to_execution(
2049 execution_path, self.job.control_file)
2050 return control_path
2051
2052
showardd1195652009-12-08 22:21:02 +00002053 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002054 execution_path = self.queue_entries[0].execution_path()
2055 control_path = self._write_control_file(execution_path)
2056 hostnames = ','.join(entry.host.hostname
2057 for entry in self.queue_entries
2058 if not entry.is_hostless())
2059
2060 execution_tag = self.queue_entries[0].execution_tag()
2061 params = _autoserv_command_line(
2062 hostnames,
2063 ['-P', execution_tag, '-n',
2064 _drone_manager.absolute_path(control_path)],
2065 job=self.job, verbose=False)
2066
2067 if not self.job.is_server_job():
2068 params.append('-c')
2069
2070 return params
showardd1195652009-12-08 22:21:02 +00002071
2072
2073 @property
2074 def num_processes(self):
2075 return len(self.queue_entries)
2076
2077
2078 @property
2079 def owner_username(self):
2080 return self.job.owner
2081
2082
2083 def _working_directory(self):
2084 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002085
2086
jadmanski0afbb632008-06-06 21:10:57 +00002087 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002088 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002089 keyval_dict = self.job.keyval_dict()
2090 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002091 group_name = self.queue_entries[0].get_group_name()
2092 if group_name:
2093 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002094 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002095 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002096 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002097 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002098
2099
showard35162b02009-03-03 02:17:30 +00002100 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002101 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002102 _drone_manager.write_lines_to_file(error_file_path,
2103 [_LOST_PROCESS_ERROR])
2104
2105
showardd3dc1992009-04-22 21:01:40 +00002106 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002107 if not self.monitor:
2108 return
2109
showardd9205182009-04-27 20:09:55 +00002110 self._write_job_finished()
2111
showard35162b02009-03-03 02:17:30 +00002112 if self.monitor.lost_process:
2113 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002114
jadmanskif7fa2cc2008-10-01 14:13:23 +00002115
showardcbd74612008-11-19 21:42:02 +00002116 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002117 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002118 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002119 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002120 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002121
2122
jadmanskif7fa2cc2008-10-01 14:13:23 +00002123 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002124 if not self.monitor or not self.monitor.has_process():
2125 return
2126
jadmanskif7fa2cc2008-10-01 14:13:23 +00002127 # build up sets of all the aborted_by and aborted_on values
2128 aborted_by, aborted_on = set(), set()
2129 for queue_entry in self.queue_entries:
2130 if queue_entry.aborted_by:
2131 aborted_by.add(queue_entry.aborted_by)
2132 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2133 aborted_on.add(t)
2134
2135 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002136 # TODO(showard): this conditional is now obsolete, we just need to leave
2137 # it in temporarily for backwards compatibility over upgrades. delete
2138 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002139 assert len(aborted_by) <= 1
2140 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002141 aborted_by_value = aborted_by.pop()
2142 aborted_on_value = max(aborted_on)
2143 else:
2144 aborted_by_value = 'autotest_system'
2145 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002146
showarda0382352009-02-11 23:36:43 +00002147 self._write_keyval_after_job("aborted_by", aborted_by_value)
2148 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002149
showardcbd74612008-11-19 21:42:02 +00002150 aborted_on_string = str(datetime.datetime.fromtimestamp(
2151 aborted_on_value))
2152 self._write_status_comment('Job aborted by %s on %s' %
2153 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002154
2155
jadmanski0afbb632008-06-06 21:10:57 +00002156 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002157 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002158 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002159 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002160
2161
jadmanski0afbb632008-06-06 21:10:57 +00002162 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002163 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002164 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002165
2166
2167class QueueTask(AbstractQueueTask):
2168 def __init__(self, queue_entries):
2169 super(QueueTask, self).__init__(queue_entries)
2170 self._set_ids(queue_entries=queue_entries)
2171
2172
2173 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002174 self._check_queue_entry_statuses(
2175 self.queue_entries,
2176 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2177 models.HostQueueEntry.Status.RUNNING),
2178 allowed_host_statuses=(models.Host.Status.PENDING,
2179 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002180
2181 super(QueueTask, self).prolog()
2182
2183 for queue_entry in self.queue_entries:
2184 self._write_host_keyvals(queue_entry.host)
2185 queue_entry.host.set_status(models.Host.Status.RUNNING)
2186 queue_entry.host.update_field('dirty', 1)
2187 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2188 # TODO(gps): Remove this if nothing needs it anymore.
2189 # A potential user is: tko/parser
2190 self.job.write_to_machines_file(self.queue_entries[0])
2191
2192
2193 def _finish_task(self):
2194 super(QueueTask, self)._finish_task()
2195
2196 for queue_entry in self.queue_entries:
2197 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002198
2199
mbligh4608b002010-01-05 18:22:35 +00002200class HostlessQueueTask(AbstractQueueTask):
2201 def __init__(self, queue_entry):
2202 super(HostlessQueueTask, self).__init__([queue_entry])
2203 self.queue_entry_ids = [queue_entry.id]
2204
2205
2206 def prolog(self):
2207 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2208 super(HostlessQueueTask, self).prolog()
2209
2210
mbligh4608b002010-01-05 18:22:35 +00002211 def _finish_task(self):
2212 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002213 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002214
2215
showardd3dc1992009-04-22 21:01:40 +00002216class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002217 def __init__(self, queue_entries, log_file_name):
2218 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002219
showardd1195652009-12-08 22:21:02 +00002220 self.queue_entries = queue_entries
2221
showardd3dc1992009-04-22 21:01:40 +00002222 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002223 self._autoserv_monitor.attach_to_existing_process(
2224 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002225
showardd1195652009-12-08 22:21:02 +00002226
2227 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002228 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002229 return 'true'
2230 return self._generate_command(
2231 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002232
2233
2234 def _generate_command(self, results_dir):
2235 raise NotImplementedError('Subclasses must override this')
2236
2237
showardd1195652009-12-08 22:21:02 +00002238 @property
2239 def owner_username(self):
2240 return self.queue_entries[0].job.owner
2241
2242
2243 def _working_directory(self):
2244 return self._get_consistent_execution_path(self.queue_entries)
2245
2246
2247 def _paired_with_monitor(self):
2248 return self._autoserv_monitor
2249
2250
showardd3dc1992009-04-22 21:01:40 +00002251 def _job_was_aborted(self):
2252 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002253 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002254 queue_entry.update_from_database()
2255 if was_aborted is None: # first queue entry
2256 was_aborted = bool(queue_entry.aborted)
2257 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2258 email_manager.manager.enqueue_notify_email(
2259 'Inconsistent abort state',
2260 'Queue entries have inconsistent abort state: ' +
2261 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2262 # don't crash here, just assume true
2263 return True
2264 return was_aborted
2265
2266
showardd1195652009-12-08 22:21:02 +00002267 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002268 if self._job_was_aborted():
2269 return models.HostQueueEntry.Status.ABORTED
2270
2271 # we'll use a PidfileRunMonitor to read the autoserv exit status
2272 if self._autoserv_monitor.exit_code() == 0:
2273 return models.HostQueueEntry.Status.COMPLETED
2274 return models.HostQueueEntry.Status.FAILED
2275
2276
showardd3dc1992009-04-22 21:01:40 +00002277 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002278 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002279 queue_entry.set_status(status)
2280
2281
2282 def abort(self):
2283 # override AgentTask.abort() to avoid killing the process and ending
2284 # the task. post-job tasks continue when the job is aborted.
2285 pass
2286
2287
mbligh4608b002010-01-05 18:22:35 +00002288 def _pidfile_label(self):
2289 # '.autoserv_execute' -> 'autoserv'
2290 return self._pidfile_name()[1:-len('_execute')]
2291
2292
showard9bb960b2009-11-19 01:02:11 +00002293class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002294 """
2295 Task responsible for
2296 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2297 * copying logs to the results repository
2298 * spawning CleanupTasks for hosts, if necessary
2299 * spawning a FinalReparseTask for the job
2300 """
showardd1195652009-12-08 22:21:02 +00002301 def __init__(self, queue_entries, recover_run_monitor=None):
2302 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002303 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002304 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002305 self._set_ids(queue_entries=queue_entries)
2306
2307
2308 def _generate_command(self, results_dir):
2309 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002310 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002311 return [_autoserv_path , '-p',
2312 '--pidfile-label=%s' % self._pidfile_label(),
2313 '--use-existing-results', '--collect-crashinfo',
2314 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002315
2316
showardd1195652009-12-08 22:21:02 +00002317 @property
2318 def num_processes(self):
2319 return len(self.queue_entries)
2320
2321
2322 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002323 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002324
2325
showardd3dc1992009-04-22 21:01:40 +00002326 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002327 self._check_queue_entry_statuses(
2328 self.queue_entries,
2329 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2330 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002331
showardd3dc1992009-04-22 21:01:40 +00002332 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002333
2334
showardd3dc1992009-04-22 21:01:40 +00002335 def epilog(self):
2336 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002337 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002338 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002339
showard9bb960b2009-11-19 01:02:11 +00002340
2341 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002342 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002343 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002344 models.HostQueueEntry.Status.COMPLETED)
2345 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2346 else:
2347 final_success = False
2348 num_tests_failed = 0
2349
showard9bb960b2009-11-19 01:02:11 +00002350 reboot_after = self._job.reboot_after
2351 do_reboot = (
2352 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002353 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002354 or reboot_after == models.RebootAfter.ALWAYS
2355 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2356 and final_success and num_tests_failed == 0))
2357
showardd1195652009-12-08 22:21:02 +00002358 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002359 if do_reboot:
2360 # don't pass the queue entry to the CleanupTask. if the cleanup
2361 # fails, the job doesn't care -- it's over.
2362 models.SpecialTask.objects.create(
2363 host=models.Host.objects.get(id=queue_entry.host.id),
2364 task=models.SpecialTask.Task.CLEANUP,
2365 requested_by=self._job.owner_model())
2366 else:
2367 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002368
2369
showard0bbfc212009-04-29 21:06:13 +00002370 def run(self):
showard597bfd32009-05-08 18:22:50 +00002371 autoserv_exit_code = self._autoserv_monitor.exit_code()
2372 # only run if Autoserv exited due to some signal. if we have no exit
2373 # code, assume something bad (and signal-like) happened.
2374 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002375 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002376 else:
2377 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002378
2379
mbligh4608b002010-01-05 18:22:35 +00002380class SelfThrottledPostJobTask(PostJobTask):
2381 """
2382 Special AgentTask subclass that maintains its own global process limit.
2383 """
2384 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002385
2386
mbligh4608b002010-01-05 18:22:35 +00002387 @classmethod
2388 def _increment_running_processes(cls):
2389 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002390
mblighd5c95802008-03-05 00:33:46 +00002391
mbligh4608b002010-01-05 18:22:35 +00002392 @classmethod
2393 def _decrement_running_processes(cls):
2394 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002395
2396
mbligh4608b002010-01-05 18:22:35 +00002397 @classmethod
2398 def _max_processes(cls):
2399 raise NotImplementedError
2400
2401
2402 @classmethod
2403 def _can_run_new_process(cls):
2404 return cls._num_running_processes < cls._max_processes()
2405
2406
2407 def _process_started(self):
2408 return bool(self.monitor)
2409
2410
2411 def tick(self):
2412 # override tick to keep trying to start until the process count goes
2413 # down and we can, at which point we revert to default behavior
2414 if self._process_started():
2415 super(SelfThrottledPostJobTask, self).tick()
2416 else:
2417 self._try_starting_process()
2418
2419
2420 def run(self):
2421 # override run() to not actually run unless we can
2422 self._try_starting_process()
2423
2424
2425 def _try_starting_process(self):
2426 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002427 return
2428
mbligh4608b002010-01-05 18:22:35 +00002429 # actually run the command
2430 super(SelfThrottledPostJobTask, self).run()
2431 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002432
mblighd5c95802008-03-05 00:33:46 +00002433
mbligh4608b002010-01-05 18:22:35 +00002434 def finished(self, success):
2435 super(SelfThrottledPostJobTask, self).finished(success)
2436 if self._process_started():
2437 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002438
showard21baa452008-10-21 00:08:39 +00002439
mbligh4608b002010-01-05 18:22:35 +00002440class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002441 def __init__(self, queue_entries):
2442 super(FinalReparseTask, self).__init__(queue_entries,
2443 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002444 # don't use _set_ids, since we don't want to set the host_ids
2445 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002446
2447
2448 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002449 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002450 results_dir]
2451
2452
2453 @property
2454 def num_processes(self):
2455 return 0 # don't include parser processes in accounting
2456
2457
2458 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002459 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002460
2461
showard97aed502008-11-04 02:01:24 +00002462 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002463 def _max_processes(cls):
2464 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002465
2466
2467 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002468 self._check_queue_entry_statuses(
2469 self.queue_entries,
2470 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002471
showard97aed502008-11-04 02:01:24 +00002472 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002473
2474
2475 def epilog(self):
2476 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002477 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002478
2479
mbligh4608b002010-01-05 18:22:35 +00002480class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002481 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2482
mbligh4608b002010-01-05 18:22:35 +00002483 def __init__(self, queue_entries):
2484 super(ArchiveResultsTask, self).__init__(queue_entries,
2485 log_file_name='.archiving.log')
2486 # don't use _set_ids, since we don't want to set the host_ids
2487 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002488
2489
mbligh4608b002010-01-05 18:22:35 +00002490 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002491 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002492
2493
mbligh4608b002010-01-05 18:22:35 +00002494 def _generate_command(self, results_dir):
2495 return [_autoserv_path , '-p',
2496 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
2497 '--use-existing-results',
showard948eb302010-01-15 00:16:20 +00002498 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2499 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002500
2501
mbligh4608b002010-01-05 18:22:35 +00002502 @classmethod
2503 def _max_processes(cls):
2504 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002505
2506
2507 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002508 self._check_queue_entry_statuses(
2509 self.queue_entries,
2510 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2511
2512 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002513
2514
mbligh4608b002010-01-05 18:22:35 +00002515 def epilog(self):
2516 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002517 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002518 failed_file = os.path.join(self._working_directory(),
2519 self._ARCHIVING_FAILED_FILE)
2520 paired_process = self._paired_with_monitor().get_process()
2521 _drone_manager.write_lines_to_file(
2522 failed_file, ['Archiving failed with exit code %s'
2523 % self.monitor.exit_code()],
2524 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002525 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002526
2527
mbligh36768f02008-02-22 18:28:33 +00002528if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002529 main()