blob: 4b18d2ca28f800719ba629a80bf8f013d109a7ed [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
mbligh36768f02008-02-22 18:28:33 +000077def main():
showard27f33872009-04-07 18:20:53 +000078 try:
showard549afad2009-08-20 23:33:36 +000079 try:
80 main_without_exception_handling()
81 except SystemExit:
82 raise
83 except:
84 logging.exception('Exception escaping in monitor_db')
85 raise
86 finally:
87 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000088
89
90def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000091 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000092
showard136e6dc2009-06-10 19:38:49 +000093 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000094 parser = optparse.OptionParser(usage)
95 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
96 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000097 parser.add_option('--test', help='Indicate that scheduler is under ' +
98 'test and should use dummy autoserv and no parsing',
99 action='store_true')
100 (options, args) = parser.parse_args()
101 if len(args) != 1:
102 parser.print_usage()
103 return
mbligh36768f02008-02-22 18:28:33 +0000104
showard5613c662009-06-08 23:30:33 +0000105 scheduler_enabled = global_config.global_config.get_config_value(
106 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
107
108 if not scheduler_enabled:
109 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
110 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000111 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000112 sys.exit(1)
113
jadmanski0afbb632008-06-06 21:10:57 +0000114 global RESULTS_DIR
115 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000116
mbligh83c1e9e2009-05-01 23:10:41 +0000117 site_init = utils.import_site_function(__file__,
118 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
119 _site_init_monitor_db_dummy)
120 site_init()
121
showardcca334f2009-03-12 20:38:34 +0000122 # Change the cwd while running to avoid issues incase we were launched from
123 # somewhere odd (such as a random NFS home directory of the person running
124 # sudo to launch us as the appropriate user).
125 os.chdir(RESULTS_DIR)
126
showardc85c21b2008-11-24 22:17:37 +0000127
jadmanski0afbb632008-06-06 21:10:57 +0000128 if options.test:
129 global _autoserv_path
130 _autoserv_path = 'autoserv_dummy'
131 global _testing_mode
132 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000133
jamesrenc44ae992010-02-19 00:12:54 +0000134 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000135 server.start()
136
jadmanski0afbb632008-06-06 21:10:57 +0000137 try:
jamesrenc44ae992010-02-19 00:12:54 +0000138 initialize()
showardc5afc462009-01-13 00:09:39 +0000139 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000140 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 while not _shutdown:
143 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000144 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000145 except:
showard170873e2009-01-07 00:22:26 +0000146 email_manager.manager.log_stacktrace(
147 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000148
showard170873e2009-01-07 00:22:26 +0000149 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000150 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000151 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000152 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000153
154
showard136e6dc2009-06-10 19:38:49 +0000155def setup_logging():
156 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
157 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
158 logging_manager.configure_logging(
159 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
160 logfile_name=log_name)
161
162
mbligh36768f02008-02-22 18:28:33 +0000163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
jamesrenc44ae992010-02-19 00:12:54 +0000169def initialize():
showardb18134f2009-03-20 20:52:18 +0000170 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
171 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000172
showard8de37132009-08-31 18:33:08 +0000173 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000174 logging.critical("monitor_db already running, aborting!")
175 sys.exit(1)
176 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000177
showardb1e51872008-10-07 11:08:18 +0000178 if _testing_mode:
179 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000180 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000181
jadmanski0afbb632008-06-06 21:10:57 +0000182 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
183 global _db
showard170873e2009-01-07 00:22:26 +0000184 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000185 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000186
showardfa8629c2008-11-04 16:51:23 +0000187 # ensure Django connection is in autocommit
188 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000189 # bypass the readonly connection
190 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000193 signal.signal(signal.SIGINT, handle_sigint)
194
jamesrenc44ae992010-02-19 00:12:54 +0000195 initialize_globals()
196 scheduler_models.initialize()
197
showardd1ee1dd2009-01-07 21:33:08 +0000198 drones = global_config.global_config.get_config_value(
199 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
200 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000201 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000202 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000203 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
204
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000206
207
jamesrenc44ae992010-02-19 00:12:54 +0000208def initialize_globals():
209 global _drone_manager
210 _drone_manager = drone_manager.instance()
211
212
showarded2afea2009-07-07 20:54:07 +0000213def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
214 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000215 """
216 @returns The autoserv command line as a list of executable + parameters.
217
218 @param machines - string - A machine or comma separated list of machines
219 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000220 @param extra_args - list - Additional arguments to pass to autoserv.
221 @param job - Job object - If supplied, -u owner and -l name parameters
222 will be added.
223 @param queue_entry - A HostQueueEntry object - If supplied and no Job
224 object was supplied, this will be used to lookup the Job object.
225 """
showarda9545c02009-12-18 22:44:26 +0000226 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000227 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000228 if machines:
229 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000234 if verbose:
235 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000236 return autoserv_argv + extra_args
237
238
showard89f84db2009-03-12 20:39:13 +0000239class SchedulerError(Exception):
240 """Raised by HostScheduler when an inconsistent state occurs."""
241
242
jamesren883492a2010-02-12 00:45:18 +0000243class HostScheduler(metahost_scheduler.HostSchedulingUtility):
244 """Handles the logic for choosing when to run jobs and on which hosts.
245
246 This class makes several queries to the database on each tick, building up
247 some auxiliary data structures and using them to determine which hosts are
248 eligible to run which jobs, taking into account all the various factors that
249 affect that.
250
251 In the past this was done with one or two very large, complex database
252 queries. It has proven much simpler and faster to build these auxiliary
253 data structures and perform the logic in Python.
254 """
255 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000256 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
257
258 # load site-specific scheduler selected in global_config
259 site_schedulers_str = global_config.global_config.get_config_value(
260 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
261 default='')
262 site_schedulers = set(site_schedulers_str.split(','))
263 for scheduler in get_site_metahost_schedulers():
264 if type(scheduler).__name__ in site_schedulers:
265 # always prepend, so site schedulers take precedence
266 self._metahost_schedulers = (
267 [scheduler] + self._metahost_schedulers)
268 logging.info('Metahost schedulers: %s',
269 ', '.join(type(scheduler).__name__ for scheduler
270 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000271
272
showard63a34772008-08-18 19:32:50 +0000273 def _get_ready_hosts(self):
274 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000275 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000276 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
277 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000278 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000279 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000280 "AND NOT afe_hosts.locked "
281 "AND (afe_hosts.status IS NULL "
282 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000283 return dict((host.id, host) for host in hosts)
284
285
286 @staticmethod
287 def _get_sql_id_list(id_list):
288 return ','.join(str(item_id) for item_id in id_list)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000293 if not id_list:
294 return {}
showard63a34772008-08-18 19:32:50 +0000295 query %= cls._get_sql_id_list(id_list)
296 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000297 return cls._process_many2many_dict(rows, flip)
298
299
300 @staticmethod
301 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000302 result = {}
303 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000304 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000305 if flip:
306 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000307 result.setdefault(left_id, set()).add(right_id)
308 return result
309
310
311 @classmethod
312 def _get_job_acl_groups(cls, job_ids):
313 query = """
showardeab66ce2009-12-23 00:03:56 +0000314 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
315 FROM afe_jobs
316 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
317 INNER JOIN afe_acl_groups_users ON
318 afe_acl_groups_users.user_id = afe_users.id
319 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
325 def _get_job_ineligible_hosts(cls, job_ids):
326 query = """
327 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000328 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard989f25d2008-10-01 11:38:11 +0000335 def _get_job_dependencies(cls, job_ids):
336 query = """
337 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000338 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000339 WHERE job_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, job_ids)
342
343
344 @classmethod
showard63a34772008-08-18 19:32:50 +0000345 def _get_host_acls(cls, host_ids):
346 query = """
showardd9ac4452009-02-07 02:04:37 +0000347 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000348 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000349 WHERE host_id IN (%s)
350 """
351 return cls._get_many2many_dict(query, host_ids)
352
353
354 @classmethod
355 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000356 if not host_ids:
357 return {}, {}
showard63a34772008-08-18 19:32:50 +0000358 query = """
359 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000360 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000361 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000362 """ % cls._get_sql_id_list(host_ids)
363 rows = _db.execute(query)
364 labels_to_hosts = cls._process_many2many_dict(rows)
365 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
366 return labels_to_hosts, hosts_to_labels
367
368
369 @classmethod
370 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000371 return dict((label.id, label) for label
372 in scheduler_models.Label.fetch())
373
374
375 def recovery_on_startup(self):
376 for metahost_scheduler in self._metahost_schedulers:
377 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def refresh(self, pending_queue_entries):
381 self._hosts_available = self._get_ready_hosts()
382
383 relevant_jobs = [queue_entry.job_id
384 for queue_entry in pending_queue_entries]
385 self._job_acls = self._get_job_acl_groups(relevant_jobs)
386 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000387 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000388
389 host_ids = self._hosts_available.keys()
390 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000391 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
392
393 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000394
jamesrene21bf412010-02-26 02:30:07 +0000395
396 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000397 for metahost_scheduler in self._metahost_schedulers:
398 metahost_scheduler.tick()
399
showard63a34772008-08-18 19:32:50 +0000400
jamesren883492a2010-02-12 00:45:18 +0000401 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000402 return set(self._label_hosts.get(label_id, ()))
403
404
405 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000406 self._label_hosts[label_id].remove(host_id)
407
408
409 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000410 return self._hosts_available.pop(host_id)
411
412
413 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000414 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
415
416
showard63a34772008-08-18 19:32:50 +0000417 def _is_acl_accessible(self, host_id, queue_entry):
418 job_acls = self._job_acls.get(queue_entry.job_id, set())
419 host_acls = self._host_acls.get(host_id, set())
420 return len(host_acls.intersection(job_acls)) > 0
421
422
showard989f25d2008-10-01 11:38:11 +0000423 def _check_job_dependencies(self, job_dependencies, host_labels):
424 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000425 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000426
427
428 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
429 queue_entry):
showardade14e22009-01-26 22:38:32 +0000430 if not queue_entry.meta_host:
431 # bypass only_if_needed labels when a specific host is selected
432 return True
433
showard989f25d2008-10-01 11:38:11 +0000434 for label_id in host_labels:
435 label = self._labels[label_id]
436 if not label.only_if_needed:
437 # we don't care about non-only_if_needed labels
438 continue
439 if queue_entry.meta_host == label_id:
440 # if the label was requested in a metahost it's OK
441 continue
442 if label_id not in job_dependencies:
443 return False
444 return True
445
446
showard89f84db2009-03-12 20:39:13 +0000447 def _check_atomic_group_labels(self, host_labels, queue_entry):
448 """
449 Determine if the given HostQueueEntry's atomic group settings are okay
450 to schedule on a host with the given labels.
451
showard6157c632009-07-06 20:19:31 +0000452 @param host_labels: A list of label ids that the host has.
453 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000454
455 @returns True if atomic group settings are okay, False otherwise.
456 """
showard6157c632009-07-06 20:19:31 +0000457 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000458 queue_entry.atomic_group_id)
459
460
showard6157c632009-07-06 20:19:31 +0000461 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000462 """
463 Return the atomic group label id for a host with the given set of
464 labels if any, or None otherwise. Raises an exception if more than
465 one atomic group are found in the set of labels.
466
showard6157c632009-07-06 20:19:31 +0000467 @param host_labels: A list of label ids that the host has.
468 @param queue_entry: The HostQueueEntry we're testing. Only used for
469 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000470
471 @returns The id of the atomic group found on a label in host_labels
472 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000473 """
showard6157c632009-07-06 20:19:31 +0000474 atomic_labels = [self._labels[label_id] for label_id in host_labels
475 if self._labels[label_id].atomic_group_id is not None]
476 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000477 if not atomic_ids:
478 return None
479 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000480 logging.error('More than one Atomic Group on HQE "%s" via: %r',
481 queue_entry, atomic_labels)
482 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000483
484
485 def _get_atomic_group_labels(self, atomic_group_id):
486 """
487 Lookup the label ids that an atomic_group is associated with.
488
489 @param atomic_group_id - The id of the AtomicGroup to look up.
490
491 @returns A generator yeilding Label ids for this atomic group.
492 """
493 return (id for id, label in self._labels.iteritems()
494 if label.atomic_group_id == atomic_group_id
495 and not label.invalid)
496
497
showard54c1ea92009-05-20 00:32:58 +0000498 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000499 """
500 @param group_hosts - A sequence of Host ids to test for usability
501 and eligibility against the Job associated with queue_entry.
502 @param queue_entry - The HostQueueEntry that these hosts are being
503 tested for eligibility against.
504
505 @returns A subset of group_hosts Host ids that are eligible for the
506 supplied queue_entry.
507 """
508 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000509 if self.is_host_usable(host_id)
510 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000511
512
jamesren883492a2010-02-12 00:45:18 +0000513 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000514 if self._is_host_invalid(host_id):
515 # if an invalid host is scheduled for a job, it's a one-time host
516 # and it therefore bypasses eligibility checks. note this can only
517 # happen for non-metahosts, because invalid hosts have their label
518 # relationships cleared.
519 return True
520
showard989f25d2008-10-01 11:38:11 +0000521 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
522 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000523
showard89f84db2009-03-12 20:39:13 +0000524 return (self._is_acl_accessible(host_id, queue_entry) and
525 self._check_job_dependencies(job_dependencies, host_labels) and
526 self._check_only_if_needed_labels(
527 job_dependencies, host_labels, queue_entry) and
528 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000529
530
showard2924b0a2009-06-18 23:16:15 +0000531 def _is_host_invalid(self, host_id):
532 host_object = self._hosts_available.get(host_id, None)
533 return host_object and host_object.invalid
534
535
showard63a34772008-08-18 19:32:50 +0000536 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000537 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000538 return None
539 return self._hosts_available.pop(queue_entry.host_id, None)
540
541
jamesren883492a2010-02-12 00:45:18 +0000542 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000543 if host_id not in self._hosts_available:
544 # host was already used during this scheduling cycle
545 return False
546 if self._hosts_available[host_id].invalid:
547 # Invalid hosts cannot be used for metahosts. They're included in
548 # the original query because they can be used by non-metahosts.
549 return False
550 return True
551
552
jamesren883492a2010-02-12 00:45:18 +0000553 def schedule_entry(self, queue_entry):
554 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000555 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000556
557 for scheduler in self._metahost_schedulers:
558 if scheduler.can_schedule_metahost(queue_entry):
559 scheduler.schedule_metahost(queue_entry, self)
560 return None
561
562 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000563
564
showard89f84db2009-03-12 20:39:13 +0000565 def find_eligible_atomic_group(self, queue_entry):
566 """
567 Given an atomic group host queue entry, locate an appropriate group
568 of hosts for the associated job to run on.
569
570 The caller is responsible for creating new HQEs for the additional
571 hosts returned in order to run the actual job on them.
572
573 @returns A list of Host instances in a ready state to satisfy this
574 atomic group scheduling. Hosts will all belong to the same
575 atomic group label as specified by the queue_entry.
576 An empty list will be returned if no suitable atomic
577 group could be found.
578
579 TODO(gps): what is responsible for kicking off any attempted repairs on
580 a group of hosts? not this function, but something needs to. We do
581 not communicate that reason for returning [] outside of here...
582 For now, we'll just be unschedulable if enough hosts within one group
583 enter Repair Failed state.
584 """
585 assert queue_entry.atomic_group_id is not None
586 job = queue_entry.job
587 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000588 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000589 if job.synch_count > atomic_group.max_number_of_machines:
590 # Such a Job and HostQueueEntry should never be possible to
591 # create using the frontend. Regardless, we can't process it.
592 # Abort it immediately and log an error on the scheduler.
593 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000594 logging.error(
595 'Error: job %d synch_count=%d > requested atomic_group %d '
596 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
597 job.id, job.synch_count, atomic_group.id,
598 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000599 return []
jamesren883492a2010-02-12 00:45:18 +0000600 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
601 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000602
603 # Look in each label associated with atomic_group until we find one with
604 # enough hosts to satisfy the job.
605 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000606 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000607 if queue_entry.meta_host is not None:
608 # If we have a metahost label, only allow its hosts.
609 group_hosts.intersection_update(hosts_in_label)
610 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000611 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000612 group_hosts, queue_entry)
613
614 # Job.synch_count is treated as "minimum synch count" when
615 # scheduling for an atomic group of hosts. The atomic group
616 # number of machines is the maximum to pick out of a single
617 # atomic group label for scheduling at one time.
618 min_hosts = job.synch_count
619 max_hosts = atomic_group.max_number_of_machines
620
showard54c1ea92009-05-20 00:32:58 +0000621 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000622 # Not enough eligible hosts in this atomic group label.
623 continue
624
showard54c1ea92009-05-20 00:32:58 +0000625 eligible_hosts_in_group = [self._hosts_available[id]
626 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000627 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000628 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000629
showard89f84db2009-03-12 20:39:13 +0000630 # Limit ourselves to scheduling the atomic group size.
631 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000632 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000633
634 # Remove the selected hosts from our cached internal state
635 # of available hosts in order to return the Host objects.
636 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000637 for host in eligible_hosts_in_group:
638 hosts_in_label.discard(host.id)
639 self._hosts_available.pop(host.id)
640 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000641 return host_list
642
643 return []
644
645
showard170873e2009-01-07 00:22:26 +0000646class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000647 def __init__(self):
648 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000649 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000650 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000651 user_cleanup_time = scheduler_config.config.clean_interval
652 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
653 _db, user_cleanup_time)
654 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000655 self._host_agents = {}
656 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000657 self._tick_count = 0
658 self._last_garbage_stats_time = time.time()
659 self._seconds_between_garbage_stats = 60 * (
660 global_config.global_config.get_config_value(
661 scheduler_config.CONFIG_SECTION,
662 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000663
mbligh36768f02008-02-22 18:28:33 +0000664
showard915958d2009-04-22 21:00:58 +0000665 def initialize(self, recover_hosts=True):
666 self._periodic_cleanup.initialize()
667 self._24hr_upkeep.initialize()
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 # always recover processes
670 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000671
jadmanski0afbb632008-06-06 21:10:57 +0000672 if recover_hosts:
673 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000674
jamesrenc44ae992010-02-19 00:12:54 +0000675 self._host_scheduler.recovery_on_startup()
676
mbligh36768f02008-02-22 18:28:33 +0000677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000679 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000680 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000681 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000682 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000683 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000684 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000685 self._schedule_running_host_queue_entries()
686 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000687 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000688 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000689 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000690 _drone_manager.execute_actions()
691 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000692 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000693 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000694
showard97aed502008-11-04 02:01:24 +0000695
mblighf3294cc2009-04-08 21:17:38 +0000696 def _run_cleanup(self):
697 self._periodic_cleanup.run_cleanup_maybe()
698 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000699
mbligh36768f02008-02-22 18:28:33 +0000700
showardf13a9e22009-12-18 22:54:09 +0000701 def _garbage_collection(self):
702 threshold_time = time.time() - self._seconds_between_garbage_stats
703 if threshold_time < self._last_garbage_stats_time:
704 # Don't generate these reports very often.
705 return
706
707 self._last_garbage_stats_time = time.time()
708 # Force a full level 0 collection (because we can, it doesn't hurt
709 # at this interval).
710 gc.collect()
711 logging.info('Logging garbage collector stats on tick %d.',
712 self._tick_count)
713 gc_stats._log_garbage_collector_stats()
714
715
showard170873e2009-01-07 00:22:26 +0000716 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
717 for object_id in object_ids:
718 agent_dict.setdefault(object_id, set()).add(agent)
719
720
721 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
722 for object_id in object_ids:
723 assert object_id in agent_dict
724 agent_dict[object_id].remove(agent)
725
726
showardd1195652009-12-08 22:21:02 +0000727 def add_agent_task(self, agent_task):
728 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000729 self._agents.append(agent)
730 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000731 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
732 self._register_agent_for_ids(self._queue_entry_agents,
733 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000734
showard170873e2009-01-07 00:22:26 +0000735
736 def get_agents_for_entry(self, queue_entry):
737 """
738 Find agents corresponding to the specified queue_entry.
739 """
showardd3dc1992009-04-22 21:01:40 +0000740 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000741
742
743 def host_has_agent(self, host):
744 """
745 Determine if there is currently an Agent present using this host.
746 """
747 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000748
749
jadmanski0afbb632008-06-06 21:10:57 +0000750 def remove_agent(self, agent):
751 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000752 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
753 agent)
754 self._unregister_agent_for_ids(self._queue_entry_agents,
755 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000756
757
showard8cc058f2009-09-08 16:26:33 +0000758 def _host_has_scheduled_special_task(self, host):
759 return bool(models.SpecialTask.objects.filter(host__id=host.id,
760 is_active=False,
761 is_complete=False))
762
763
jadmanski0afbb632008-06-06 21:10:57 +0000764 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000765 agent_tasks = self._create_recovery_agent_tasks()
766 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000767 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000768 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000769 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000770 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000771 self._reverify_remaining_hosts()
772 # reinitialize drones after killing orphaned processes, since they can
773 # leave around files when they die
774 _drone_manager.execute_actions()
775 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000776
showard170873e2009-01-07 00:22:26 +0000777
showardd1195652009-12-08 22:21:02 +0000778 def _create_recovery_agent_tasks(self):
779 return (self._get_queue_entry_agent_tasks()
780 + self._get_special_task_agent_tasks(is_active=True))
781
782
783 def _get_queue_entry_agent_tasks(self):
784 # host queue entry statuses handled directly by AgentTasks (Verifying is
785 # handled through SpecialTasks, so is not listed here)
786 statuses = (models.HostQueueEntry.Status.STARTING,
787 models.HostQueueEntry.Status.RUNNING,
788 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000789 models.HostQueueEntry.Status.PARSING,
790 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000791 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000792 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000793 where='status IN (%s)' % status_list)
794
795 agent_tasks = []
796 used_queue_entries = set()
797 for entry in queue_entries:
798 if self.get_agents_for_entry(entry):
799 # already being handled
800 continue
801 if entry in used_queue_entries:
802 # already picked up by a synchronous job
803 continue
804 agent_task = self._get_agent_task_for_queue_entry(entry)
805 agent_tasks.append(agent_task)
806 used_queue_entries.update(agent_task.queue_entries)
807 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000808
809
showardd1195652009-12-08 22:21:02 +0000810 def _get_special_task_agent_tasks(self, is_active=False):
811 special_tasks = models.SpecialTask.objects.filter(
812 is_active=is_active, is_complete=False)
813 return [self._get_agent_task_for_special_task(task)
814 for task in special_tasks]
815
816
817 def _get_agent_task_for_queue_entry(self, queue_entry):
818 """
819 Construct an AgentTask instance for the given active HostQueueEntry,
820 if one can currently run it.
821 @param queue_entry: a HostQueueEntry
822 @returns an AgentTask to run the queue entry
823 """
824 task_entries = queue_entry.job.get_group_entries(queue_entry)
825 self._check_for_duplicate_host_entries(task_entries)
826
827 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
828 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000829 if queue_entry.is_hostless():
830 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000831 return QueueTask(queue_entries=task_entries)
832 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
833 return GatherLogsTask(queue_entries=task_entries)
834 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
835 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000836 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
837 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000838
839 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000840 'invalid status %s: %s'
841 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000842
843
844 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000845 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
846 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000847 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000848 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000849 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000850 if using_host:
showardd1195652009-12-08 22:21:02 +0000851 self._assert_host_has_no_agent(task_entry)
852
853
854 def _assert_host_has_no_agent(self, entry):
855 """
856 @param entry: a HostQueueEntry or a SpecialTask
857 """
858 if self.host_has_agent(entry.host):
859 agent = tuple(self._host_agents.get(entry.host.id))[0]
860 raise SchedulerError(
861 'While scheduling %s, host %s already has a host agent %s'
862 % (entry, entry.host, agent.task))
863
864
865 def _get_agent_task_for_special_task(self, special_task):
866 """
867 Construct an AgentTask class to run the given SpecialTask and add it
868 to this dispatcher.
869 @param special_task: a models.SpecialTask instance
870 @returns an AgentTask to run this SpecialTask
871 """
872 self._assert_host_has_no_agent(special_task)
873
874 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
875 for agent_task_class in special_agent_task_classes:
876 if agent_task_class.TASK_TYPE == special_task.task:
877 return agent_task_class(task=special_task)
878
879 raise SchedulerError('No AgentTask class for task', str(special_task))
880
881
882 def _register_pidfiles(self, agent_tasks):
883 for agent_task in agent_tasks:
884 agent_task.register_necessary_pidfiles()
885
886
887 def _recover_tasks(self, agent_tasks):
888 orphans = _drone_manager.get_orphaned_autoserv_processes()
889
890 for agent_task in agent_tasks:
891 agent_task.recover()
892 if agent_task.monitor and agent_task.monitor.has_process():
893 orphans.discard(agent_task.monitor.get_process())
894 self.add_agent_task(agent_task)
895
896 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000897
898
showard8cc058f2009-09-08 16:26:33 +0000899 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000900 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
901 % status):
showard0db3d432009-10-12 20:29:15 +0000902 if entry.status == status and not self.get_agents_for_entry(entry):
903 # The status can change during iteration, e.g., if job.run()
904 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000905 yield entry
906
907
showard6878e8b2009-07-20 22:37:45 +0000908 def _check_for_remaining_orphan_processes(self, orphans):
909 if not orphans:
910 return
911 subject = 'Unrecovered orphan autoserv processes remain'
912 message = '\n'.join(str(process) for process in orphans)
913 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000914
915 die_on_orphans = global_config.global_config.get_config_value(
916 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
917
918 if die_on_orphans:
919 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000920
showard170873e2009-01-07 00:22:26 +0000921
showard8cc058f2009-09-08 16:26:33 +0000922 def _recover_pending_entries(self):
923 for entry in self._get_unassigned_entries(
924 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000925 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000926 entry.on_pending()
927
928
showardb8900452009-10-12 20:31:01 +0000929 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000930 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000931 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
932 unrecovered_hqes = []
933 for queue_entry in queue_entries:
934 special_tasks = models.SpecialTask.objects.filter(
935 task__in=(models.SpecialTask.Task.CLEANUP,
936 models.SpecialTask.Task.VERIFY),
937 queue_entry__id=queue_entry.id,
938 is_complete=False)
939 if special_tasks.count() == 0:
940 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000941
showardb8900452009-10-12 20:31:01 +0000942 if unrecovered_hqes:
943 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000944 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000945 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000946 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000947
948
showard65db3932009-10-28 19:54:35 +0000949 def _get_prioritized_special_tasks(self):
950 """
951 Returns all queued SpecialTasks prioritized for repair first, then
952 cleanup, then verify.
953 """
954 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
955 is_complete=False,
956 host__locked=False)
957 # exclude hosts with active queue entries unless the SpecialTask is for
958 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000959 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000960 queued_tasks, 'afe_host_queue_entries', 'host_id',
961 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000962 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000963 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000964 where=['(afe_host_queue_entries.id IS NULL OR '
965 'afe_host_queue_entries.id = '
966 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000967
showard65db3932009-10-28 19:54:35 +0000968 # reorder tasks by priority
969 task_priority_order = [models.SpecialTask.Task.REPAIR,
970 models.SpecialTask.Task.CLEANUP,
971 models.SpecialTask.Task.VERIFY]
972 def task_priority_key(task):
973 return task_priority_order.index(task.task)
974 return sorted(queued_tasks, key=task_priority_key)
975
976
showard65db3932009-10-28 19:54:35 +0000977 def _schedule_special_tasks(self):
978 """
979 Execute queued SpecialTasks that are ready to run on idle hosts.
980 """
981 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000982 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000983 continue
showardd1195652009-12-08 22:21:02 +0000984 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000985
986
showard170873e2009-01-07 00:22:26 +0000987 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000988 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000989 # should never happen
showarded2afea2009-07-07 20:54:07 +0000990 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000991 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000992 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000993 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000994 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000995
996
jadmanski0afbb632008-06-06 21:10:57 +0000997 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000998 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000999 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001000 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001001 if self.host_has_agent(host):
1002 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001003 continue
showard8cc058f2009-09-08 16:26:33 +00001004 if self._host_has_scheduled_special_task(host):
1005 # host will have a special task scheduled on the next cycle
1006 continue
showard170873e2009-01-07 00:22:26 +00001007 if print_message:
showardb18134f2009-03-20 20:52:18 +00001008 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001009 models.SpecialTask.objects.create(
1010 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001011 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001012
1013
jadmanski0afbb632008-06-06 21:10:57 +00001014 def _recover_hosts(self):
1015 # recover "Repair Failed" hosts
1016 message = 'Reverifying dead host %s'
1017 self._reverify_hosts_where("status = 'Repair Failed'",
1018 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001019
1020
showard04c82c52008-05-29 19:38:12 +00001021
showardb95b1bd2008-08-15 18:11:04 +00001022 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001023 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001024 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001025 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001026 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001027 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001028
1029
showard89f84db2009-03-12 20:39:13 +00001030 def _refresh_pending_queue_entries(self):
1031 """
1032 Lookup the pending HostQueueEntries and call our HostScheduler
1033 refresh() method given that list. Return the list.
1034
1035 @returns A list of pending HostQueueEntries sorted in priority order.
1036 """
showard63a34772008-08-18 19:32:50 +00001037 queue_entries = self._get_pending_queue_entries()
1038 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001039 return []
showardb95b1bd2008-08-15 18:11:04 +00001040
showard63a34772008-08-18 19:32:50 +00001041 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001042
showard89f84db2009-03-12 20:39:13 +00001043 return queue_entries
1044
1045
1046 def _schedule_atomic_group(self, queue_entry):
1047 """
1048 Schedule the given queue_entry on an atomic group of hosts.
1049
1050 Returns immediately if there are insufficient available hosts.
1051
1052 Creates new HostQueueEntries based off of queue_entry for the
1053 scheduled hosts and starts them all running.
1054 """
1055 # This is a virtual host queue entry representing an entire
1056 # atomic group, find a group and schedule their hosts.
1057 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1058 queue_entry)
1059 if not group_hosts:
1060 return
showardcbe6f942009-06-17 19:33:49 +00001061
1062 logging.info('Expanding atomic group entry %s with hosts %s',
1063 queue_entry,
1064 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001065
showard89f84db2009-03-12 20:39:13 +00001066 for assigned_host in group_hosts[1:]:
1067 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001068 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001069 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001070 new_hqe.set_host(assigned_host)
1071 self._run_queue_entry(new_hqe)
1072
1073 # The first assigned host uses the original HostQueueEntry
1074 queue_entry.set_host(group_hosts[0])
1075 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001076
1077
showarda9545c02009-12-18 22:44:26 +00001078 def _schedule_hostless_job(self, queue_entry):
1079 self.add_agent_task(HostlessQueueTask(queue_entry))
1080
1081
showard89f84db2009-03-12 20:39:13 +00001082 def _schedule_new_jobs(self):
1083 queue_entries = self._refresh_pending_queue_entries()
1084 if not queue_entries:
1085 return
1086
showard63a34772008-08-18 19:32:50 +00001087 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001088 is_unassigned_atomic_group = (
1089 queue_entry.atomic_group_id is not None
1090 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001091
1092 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001093 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001094 elif is_unassigned_atomic_group:
1095 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001096 else:
jamesren883492a2010-02-12 00:45:18 +00001097 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001098 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001099 assert assigned_host.id == queue_entry.host_id
1100 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001101
1102
showard8cc058f2009-09-08 16:26:33 +00001103 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001104 for agent_task in self._get_queue_entry_agent_tasks():
1105 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001106
1107
1108 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001109 for entry in scheduler_models.HostQueueEntry.fetch(
1110 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001111 task = entry.job.schedule_delayed_callback_task(entry)
1112 if task:
showardd1195652009-12-08 22:21:02 +00001113 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001114
1115
jamesren883492a2010-02-12 00:45:18 +00001116 def _run_queue_entry(self, queue_entry):
1117 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001118
1119
jadmanski0afbb632008-06-06 21:10:57 +00001120 def _find_aborting(self):
jamesrenc44ae992010-02-19 00:12:54 +00001121 for entry in scheduler_models.HostQueueEntry.fetch(
1122 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001123 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001124 for agent in self.get_agents_for_entry(entry):
1125 agent.abort()
1126 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001127
1128
showard324bf812009-01-20 23:23:38 +00001129 def _can_start_agent(self, agent, num_started_this_cycle,
1130 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001131 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001132 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001133 return True
1134 # don't allow any nonzero-process agents to run after we've reached a
1135 # limit (this avoids starvation of many-process agents)
1136 if have_reached_limit:
1137 return False
1138 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001139 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001140 agent.task.owner_username)
1141 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001142 return False
1143 # if a single agent exceeds the per-cycle throttling, still allow it to
1144 # run when it's the first agent in the cycle
1145 if num_started_this_cycle == 0:
1146 return True
1147 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001148 if (num_started_this_cycle + agent.task.num_processes >
1149 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001150 return False
1151 return True
1152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001155 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001156 have_reached_limit = False
1157 # iterate over copy, so we can remove agents during iteration
1158 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001159 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001160 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001161 have_reached_limit):
1162 have_reached_limit = True
1163 continue
showardd1195652009-12-08 22:21:02 +00001164 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001165 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001166 if agent.is_done():
1167 logging.info("agent finished")
1168 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001169 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001170 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001171
1172
showard29f7cd22009-04-29 21:16:24 +00001173 def _process_recurring_runs(self):
1174 recurring_runs = models.RecurringRun.objects.filter(
1175 start_date__lte=datetime.datetime.now())
1176 for rrun in recurring_runs:
1177 # Create job from template
1178 job = rrun.job
1179 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001180 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001181
1182 host_objects = info['hosts']
1183 one_time_hosts = info['one_time_hosts']
1184 metahost_objects = info['meta_hosts']
1185 dependencies = info['dependencies']
1186 atomic_group = info['atomic_group']
1187
1188 for host in one_time_hosts or []:
1189 this_host = models.Host.create_one_time_host(host.hostname)
1190 host_objects.append(this_host)
1191
1192 try:
1193 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001194 options=options,
showard29f7cd22009-04-29 21:16:24 +00001195 host_objects=host_objects,
1196 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001197 atomic_group=atomic_group)
1198
1199 except Exception, ex:
1200 logging.exception(ex)
1201 #TODO send email
1202
1203 if rrun.loop_count == 1:
1204 rrun.delete()
1205 else:
1206 if rrun.loop_count != 0: # if not infinite loop
1207 # calculate new start_date
1208 difference = datetime.timedelta(seconds=rrun.loop_period)
1209 rrun.start_date = rrun.start_date + difference
1210 rrun.loop_count -= 1
1211 rrun.save()
1212
1213
showard170873e2009-01-07 00:22:26 +00001214class PidfileRunMonitor(object):
1215 """
1216 Client must call either run() to start a new process or
1217 attach_to_existing_process().
1218 """
mbligh36768f02008-02-22 18:28:33 +00001219
showard170873e2009-01-07 00:22:26 +00001220 class _PidfileException(Exception):
1221 """
1222 Raised when there's some unexpected behavior with the pid file, but only
1223 used internally (never allowed to escape this class).
1224 """
mbligh36768f02008-02-22 18:28:33 +00001225
1226
showard170873e2009-01-07 00:22:26 +00001227 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001228 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001229 self._start_time = None
1230 self.pidfile_id = None
1231 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001232
1233
showard170873e2009-01-07 00:22:26 +00001234 def _add_nice_command(self, command, nice_level):
1235 if not nice_level:
1236 return command
1237 return ['nice', '-n', str(nice_level)] + command
1238
1239
1240 def _set_start_time(self):
1241 self._start_time = time.time()
1242
1243
showard418785b2009-11-23 20:19:59 +00001244 def run(self, command, working_directory, num_processes, nice_level=None,
1245 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1246 username=None):
showard170873e2009-01-07 00:22:26 +00001247 assert command is not None
1248 if nice_level is not None:
1249 command = ['nice', '-n', str(nice_level)] + command
1250 self._set_start_time()
1251 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001252 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001253 num_processes=num_processes, log_file=log_file,
1254 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001255
1256
showarded2afea2009-07-07 20:54:07 +00001257 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001258 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001259 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001260 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001261 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001262 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001263 if num_processes is not None:
1264 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001265
1266
jadmanski0afbb632008-06-06 21:10:57 +00001267 def kill(self):
showard170873e2009-01-07 00:22:26 +00001268 if self.has_process():
1269 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001270
mbligh36768f02008-02-22 18:28:33 +00001271
showard170873e2009-01-07 00:22:26 +00001272 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001273 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001274 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001275
1276
showard170873e2009-01-07 00:22:26 +00001277 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001278 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001279 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001280 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001281
1282
showard170873e2009-01-07 00:22:26 +00001283 def _read_pidfile(self, use_second_read=False):
1284 assert self.pidfile_id is not None, (
1285 'You must call run() or attach_to_existing_process()')
1286 contents = _drone_manager.get_pidfile_contents(
1287 self.pidfile_id, use_second_read=use_second_read)
1288 if contents.is_invalid():
1289 self._state = drone_manager.PidfileContents()
1290 raise self._PidfileException(contents)
1291 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001292
1293
showard21baa452008-10-21 00:08:39 +00001294 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001295 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1296 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001297 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001298 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001299
1300
1301 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001302 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001303 return
mblighbb421852008-03-11 22:36:16 +00001304
showard21baa452008-10-21 00:08:39 +00001305 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001306
showard170873e2009-01-07 00:22:26 +00001307 if self._state.process is None:
1308 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001309 return
mbligh90a549d2008-03-25 23:52:34 +00001310
showard21baa452008-10-21 00:08:39 +00001311 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001312 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001313 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001314 return
mbligh90a549d2008-03-25 23:52:34 +00001315
showard170873e2009-01-07 00:22:26 +00001316 # pid but no running process - maybe process *just* exited
1317 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001318 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001319 # autoserv exited without writing an exit code
1320 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001321 self._handle_pidfile_error(
1322 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001323
showard21baa452008-10-21 00:08:39 +00001324
1325 def _get_pidfile_info(self):
1326 """\
1327 After completion, self._state will contain:
1328 pid=None, exit_status=None if autoserv has not yet run
1329 pid!=None, exit_status=None if autoserv is running
1330 pid!=None, exit_status!=None if autoserv has completed
1331 """
1332 try:
1333 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001334 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001335 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001336
1337
showard170873e2009-01-07 00:22:26 +00001338 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001339 """\
1340 Called when no pidfile is found or no pid is in the pidfile.
1341 """
showard170873e2009-01-07 00:22:26 +00001342 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001343 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001344 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001345 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001346 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001347
1348
showard35162b02009-03-03 02:17:30 +00001349 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001350 """\
1351 Called when autoserv has exited without writing an exit status,
1352 or we've timed out waiting for autoserv to write a pid to the
1353 pidfile. In either case, we just return failure and the caller
1354 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001355
showard170873e2009-01-07 00:22:26 +00001356 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001357 """
1358 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001359 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001360 self._state.exit_status = 1
1361 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001362
1363
jadmanski0afbb632008-06-06 21:10:57 +00001364 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001365 self._get_pidfile_info()
1366 return self._state.exit_status
1367
1368
1369 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001370 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001371 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001372 if self._state.num_tests_failed is None:
1373 return -1
showard21baa452008-10-21 00:08:39 +00001374 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001375
1376
showardcdaeae82009-08-31 18:32:48 +00001377 def try_copy_results_on_drone(self, **kwargs):
1378 if self.has_process():
1379 # copy results logs into the normal place for job results
1380 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1381
1382
1383 def try_copy_to_results_repository(self, source, **kwargs):
1384 if self.has_process():
1385 _drone_manager.copy_to_results_repository(self.get_process(),
1386 source, **kwargs)
1387
1388
mbligh36768f02008-02-22 18:28:33 +00001389class Agent(object):
showard77182562009-06-10 00:16:05 +00001390 """
showard8cc058f2009-09-08 16:26:33 +00001391 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001392
1393 The following methods are required on all task objects:
1394 poll() - Called periodically to let the task check its status and
1395 update its internal state. If the task succeeded.
1396 is_done() - Returns True if the task is finished.
1397 abort() - Called when an abort has been requested. The task must
1398 set its aborted attribute to True if it actually aborted.
1399
1400 The following attributes are required on all task objects:
1401 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001402 success - bool, True if this task succeeded.
1403 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1404 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001405 """
1406
1407
showard418785b2009-11-23 20:19:59 +00001408 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001409 """
showard8cc058f2009-09-08 16:26:33 +00001410 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001411 """
showard8cc058f2009-09-08 16:26:33 +00001412 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001413
showard77182562009-06-10 00:16:05 +00001414 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001415 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001416
showard8cc058f2009-09-08 16:26:33 +00001417 self.queue_entry_ids = task.queue_entry_ids
1418 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001419
showard8cc058f2009-09-08 16:26:33 +00001420 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001421 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001422
1423
jadmanski0afbb632008-06-06 21:10:57 +00001424 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001425 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001426 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001427 self.task.poll()
1428 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001429 self.finished = True
showardec113162008-05-08 00:52:49 +00001430
1431
jadmanski0afbb632008-06-06 21:10:57 +00001432 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001433 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001434
1435
showardd3dc1992009-04-22 21:01:40 +00001436 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001437 if self.task:
1438 self.task.abort()
1439 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001440 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001441 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001442
showardd3dc1992009-04-22 21:01:40 +00001443
mbligh36768f02008-02-22 18:28:33 +00001444class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001445 class _NullMonitor(object):
1446 pidfile_id = None
1447
1448 def has_process(self):
1449 return True
1450
1451
1452 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001453 """
showardd1195652009-12-08 22:21:02 +00001454 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001455 """
jadmanski0afbb632008-06-06 21:10:57 +00001456 self.done = False
showardd1195652009-12-08 22:21:02 +00001457 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001458 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001459 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001460 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001461 self.queue_entry_ids = []
1462 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001463 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001464
1465
1466 def _set_ids(self, host=None, queue_entries=None):
1467 if queue_entries and queue_entries != [None]:
1468 self.host_ids = [entry.host.id for entry in queue_entries]
1469 self.queue_entry_ids = [entry.id for entry in queue_entries]
1470 else:
1471 assert host
1472 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001473
1474
jadmanski0afbb632008-06-06 21:10:57 +00001475 def poll(self):
showard08a36412009-05-05 01:01:13 +00001476 if not self.started:
1477 self.start()
showardd1195652009-12-08 22:21:02 +00001478 if not self.done:
1479 self.tick()
showard08a36412009-05-05 01:01:13 +00001480
1481
1482 def tick(self):
showardd1195652009-12-08 22:21:02 +00001483 assert self.monitor
1484 exit_code = self.monitor.exit_code()
1485 if exit_code is None:
1486 return
mbligh36768f02008-02-22 18:28:33 +00001487
showardd1195652009-12-08 22:21:02 +00001488 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001489 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001490
1491
jadmanski0afbb632008-06-06 21:10:57 +00001492 def is_done(self):
1493 return self.done
mbligh36768f02008-02-22 18:28:33 +00001494
1495
jadmanski0afbb632008-06-06 21:10:57 +00001496 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001497 if self.done:
showardd1195652009-12-08 22:21:02 +00001498 assert self.started
showard08a36412009-05-05 01:01:13 +00001499 return
showardd1195652009-12-08 22:21:02 +00001500 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001501 self.done = True
1502 self.success = success
1503 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001504
1505
jadmanski0afbb632008-06-06 21:10:57 +00001506 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001507 """
1508 To be overridden.
1509 """
showarded2afea2009-07-07 20:54:07 +00001510 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001511 self.register_necessary_pidfiles()
1512
1513
1514 def _log_file(self):
1515 if not self._log_file_name:
1516 return None
1517 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001518
mbligh36768f02008-02-22 18:28:33 +00001519
jadmanski0afbb632008-06-06 21:10:57 +00001520 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001521 log_file = self._log_file()
1522 if self.monitor and log_file:
1523 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001524
1525
jadmanski0afbb632008-06-06 21:10:57 +00001526 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001527 """
1528 To be overridden.
1529 """
jadmanski0afbb632008-06-06 21:10:57 +00001530 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001531 logging.info("%s finished with success=%s", type(self).__name__,
1532 self.success)
1533
mbligh36768f02008-02-22 18:28:33 +00001534
1535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001537 if not self.started:
1538 self.prolog()
1539 self.run()
1540
1541 self.started = True
1542
1543
1544 def abort(self):
1545 if self.monitor:
1546 self.monitor.kill()
1547 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001548 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001549 self.cleanup()
1550
1551
showarded2afea2009-07-07 20:54:07 +00001552 def _get_consistent_execution_path(self, execution_entries):
1553 first_execution_path = execution_entries[0].execution_path()
1554 for execution_entry in execution_entries[1:]:
1555 assert execution_entry.execution_path() == first_execution_path, (
1556 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1557 execution_entry,
1558 first_execution_path,
1559 execution_entries[0]))
1560 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001561
1562
showarded2afea2009-07-07 20:54:07 +00001563 def _copy_results(self, execution_entries, use_monitor=None):
1564 """
1565 @param execution_entries: list of objects with execution_path() method
1566 """
showard6d1c1432009-08-20 23:30:39 +00001567 if use_monitor is not None and not use_monitor.has_process():
1568 return
1569
showarded2afea2009-07-07 20:54:07 +00001570 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001571 if use_monitor is None:
1572 assert self.monitor
1573 use_monitor = self.monitor
1574 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001575 execution_path = self._get_consistent_execution_path(execution_entries)
1576 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001577 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001578
showarda1e74b32009-05-12 17:32:04 +00001579
1580 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001581 for queue_entry in queue_entries:
1582 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001583
1584
mbligh4608b002010-01-05 18:22:35 +00001585 def _archive_results(self, queue_entries):
1586 for queue_entry in queue_entries:
1587 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001588
1589
showardd1195652009-12-08 22:21:02 +00001590 def _command_line(self):
1591 """
1592 Return the command line to run. Must be overridden.
1593 """
1594 raise NotImplementedError
1595
1596
1597 @property
1598 def num_processes(self):
1599 """
1600 Return the number of processes forked by this AgentTask's process. It
1601 may only be approximate. To be overridden if necessary.
1602 """
1603 return 1
1604
1605
1606 def _paired_with_monitor(self):
1607 """
1608 If this AgentTask's process must run on the same machine as some
1609 previous process, this method should be overridden to return a
1610 PidfileRunMonitor for that process.
1611 """
1612 return self._NullMonitor()
1613
1614
1615 @property
1616 def owner_username(self):
1617 """
1618 Return login of user responsible for this task. May be None. Must be
1619 overridden.
1620 """
1621 raise NotImplementedError
1622
1623
1624 def _working_directory(self):
1625 """
1626 Return the directory where this AgentTask's process executes. Must be
1627 overridden.
1628 """
1629 raise NotImplementedError
1630
1631
1632 def _pidfile_name(self):
1633 """
1634 Return the name of the pidfile this AgentTask's process uses. To be
1635 overridden if necessary.
1636 """
jamesrenc44ae992010-02-19 00:12:54 +00001637 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001638
1639
1640 def _check_paired_results_exist(self):
1641 if not self._paired_with_monitor().has_process():
1642 email_manager.manager.enqueue_notify_email(
1643 'No paired results in task',
1644 'No paired results in task %s at %s'
1645 % (self, self._paired_with_monitor().pidfile_id))
1646 self.finished(False)
1647 return False
1648 return True
1649
1650
1651 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001652 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001653 self.monitor = PidfileRunMonitor()
1654
1655
1656 def run(self):
1657 if not self._check_paired_results_exist():
1658 return
1659
1660 self._create_monitor()
1661 self.monitor.run(
1662 self._command_line(), self._working_directory(),
1663 num_processes=self.num_processes,
1664 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1665 pidfile_name=self._pidfile_name(),
1666 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1667 username=self.owner_username)
1668
1669
1670 def register_necessary_pidfiles(self):
1671 pidfile_id = _drone_manager.get_pidfile_id_from(
1672 self._working_directory(), self._pidfile_name())
1673 _drone_manager.register_pidfile(pidfile_id)
1674
1675 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1676 if paired_pidfile_id:
1677 _drone_manager.register_pidfile(paired_pidfile_id)
1678
1679
1680 def recover(self):
1681 if not self._check_paired_results_exist():
1682 return
1683
1684 self._create_monitor()
1685 self.monitor.attach_to_existing_process(
1686 self._working_directory(), pidfile_name=self._pidfile_name(),
1687 num_processes=self.num_processes)
1688 if not self.monitor.has_process():
1689 # no process to recover; wait to be started normally
1690 self.monitor = None
1691 return
1692
1693 self.started = True
1694 logging.info('Recovering process %s for %s at %s'
1695 % (self.monitor.get_process(), type(self).__name__,
1696 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001697
1698
mbligh4608b002010-01-05 18:22:35 +00001699 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1700 allowed_host_statuses=None):
1701 for entry in queue_entries:
1702 if entry.status not in allowed_hqe_statuses:
1703 raise SchedulerError('Queue task attempting to start '
1704 'entry with invalid status %s: %s'
1705 % (entry.status, entry))
1706 invalid_host_status = (
1707 allowed_host_statuses is not None
1708 and entry.host.status not in allowed_host_statuses)
1709 if invalid_host_status:
1710 raise SchedulerError('Queue task attempting to start on queue '
1711 'entry with invalid host status %s: %s'
1712 % (entry.host.status, entry))
1713
1714
showardd9205182009-04-27 20:09:55 +00001715class TaskWithJobKeyvals(object):
1716 """AgentTask mixin providing functionality to help with job keyval files."""
1717 _KEYVAL_FILE = 'keyval'
1718 def _format_keyval(self, key, value):
1719 return '%s=%s' % (key, value)
1720
1721
1722 def _keyval_path(self):
1723 """Subclasses must override this"""
1724 raise NotImplemented
1725
1726
1727 def _write_keyval_after_job(self, field, value):
1728 assert self.monitor
1729 if not self.monitor.has_process():
1730 return
1731 _drone_manager.write_lines_to_file(
1732 self._keyval_path(), [self._format_keyval(field, value)],
1733 paired_with_process=self.monitor.get_process())
1734
1735
1736 def _job_queued_keyval(self, job):
1737 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1738
1739
1740 def _write_job_finished(self):
1741 self._write_keyval_after_job("job_finished", int(time.time()))
1742
1743
showarddb502762009-09-09 15:31:20 +00001744 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1745 keyval_contents = '\n'.join(self._format_keyval(key, value)
1746 for key, value in keyval_dict.iteritems())
1747 # always end with a newline to allow additional keyvals to be written
1748 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001749 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001750 keyval_contents,
1751 file_path=keyval_path)
1752
1753
1754 def _write_keyvals_before_job(self, keyval_dict):
1755 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1756
1757
1758 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001759 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001760 host.hostname)
1761 platform, all_labels = host.platform_and_labels()
1762 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1763 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1764
1765
showard8cc058f2009-09-08 16:26:33 +00001766class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001767 """
1768 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1769 """
1770
1771 TASK_TYPE = None
1772 host = None
1773 queue_entry = None
1774
showardd1195652009-12-08 22:21:02 +00001775 def __init__(self, task, extra_command_args):
1776 super(SpecialAgentTask, self).__init__()
1777
showarded2afea2009-07-07 20:54:07 +00001778 assert (self.TASK_TYPE is not None,
1779 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001780
jamesrenc44ae992010-02-19 00:12:54 +00001781 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001782 self.queue_entry = None
1783 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001784 self.queue_entry = scheduler_models.HostQueueEntry(
1785 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001786
showarded2afea2009-07-07 20:54:07 +00001787 self.task = task
1788 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001789
1790
showard8cc058f2009-09-08 16:26:33 +00001791 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001792 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1793
1794
1795 def _command_line(self):
1796 return _autoserv_command_line(self.host.hostname,
1797 self._extra_command_args,
1798 queue_entry=self.queue_entry)
1799
1800
1801 def _working_directory(self):
1802 return self.task.execution_path()
1803
1804
1805 @property
1806 def owner_username(self):
1807 if self.task.requested_by:
1808 return self.task.requested_by.login
1809 return None
showard8cc058f2009-09-08 16:26:33 +00001810
1811
showarded2afea2009-07-07 20:54:07 +00001812 def prolog(self):
1813 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001814 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001815 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001816
1817
showardde634ee2009-01-30 01:44:24 +00001818 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001819 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001820
showard2fe3f1d2009-07-06 20:19:11 +00001821 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001822 return # don't fail metahost entries, they'll be reassigned
1823
showard2fe3f1d2009-07-06 20:19:11 +00001824 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001825 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001826 return # entry has been aborted
1827
showard2fe3f1d2009-07-06 20:19:11 +00001828 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001829 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001830 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001831 self._write_keyval_after_job(queued_key, queued_time)
1832 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001833
showard8cc058f2009-09-08 16:26:33 +00001834 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001835 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001836 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001837 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001838
showard8cc058f2009-09-08 16:26:33 +00001839 pidfile_id = _drone_manager.get_pidfile_id_from(
1840 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001841 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001842 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001843
1844 if self.queue_entry.job.parse_failed_repair:
1845 self._parse_results([self.queue_entry])
1846 else:
1847 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001848
1849
1850 def cleanup(self):
1851 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001852
1853 # We will consider an aborted task to be "Failed"
1854 self.task.finish(bool(self.success))
1855
showardf85a0b72009-10-07 20:48:45 +00001856 if self.monitor:
1857 if self.monitor.has_process():
1858 self._copy_results([self.task])
1859 if self.monitor.pidfile_id is not None:
1860 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001861
1862
1863class RepairTask(SpecialAgentTask):
1864 TASK_TYPE = models.SpecialTask.Task.REPAIR
1865
1866
showardd1195652009-12-08 22:21:02 +00001867 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001868 """\
1869 queue_entry: queue entry to mark failed if this repair fails.
1870 """
1871 protection = host_protections.Protection.get_string(
1872 task.host.protection)
1873 # normalize the protection name
1874 protection = host_protections.Protection.get_attr_name(protection)
1875
1876 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001877 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001878
1879 # *don't* include the queue entry in IDs -- if the queue entry is
1880 # aborted, we want to leave the repair task running
1881 self._set_ids(host=self.host)
1882
1883
1884 def prolog(self):
1885 super(RepairTask, self).prolog()
1886 logging.info("repair_task starting")
1887 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001888
1889
jadmanski0afbb632008-06-06 21:10:57 +00001890 def epilog(self):
1891 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001892
jadmanski0afbb632008-06-06 21:10:57 +00001893 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001894 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001895 else:
showard8cc058f2009-09-08 16:26:33 +00001896 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001897 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001898 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001899
1900
showarded2afea2009-07-07 20:54:07 +00001901class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001902 def _copy_to_results_repository(self):
1903 if not self.queue_entry or self.queue_entry.meta_host:
1904 return
1905
1906 self.queue_entry.set_execution_subdir()
1907 log_name = os.path.basename(self.task.execution_path())
1908 source = os.path.join(self.task.execution_path(), 'debug',
1909 'autoserv.DEBUG')
1910 destination = os.path.join(
1911 self.queue_entry.execution_path(), log_name)
1912
1913 self.monitor.try_copy_to_results_repository(
1914 source, destination_path=destination)
1915
1916
showard170873e2009-01-07 00:22:26 +00001917 def epilog(self):
1918 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001919
showard775300b2009-09-09 15:30:50 +00001920 if self.success:
1921 return
showard8fe93b52008-11-18 17:53:22 +00001922
showard775300b2009-09-09 15:30:50 +00001923 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001924
showard775300b2009-09-09 15:30:50 +00001925 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001926 # effectively ignore failure for these hosts
1927 self.success = True
showard775300b2009-09-09 15:30:50 +00001928 return
1929
1930 if self.queue_entry:
1931 self.queue_entry.requeue()
1932
1933 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001934 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001935 queue_entry__id=self.queue_entry.id):
1936 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1937 self._fail_queue_entry()
1938 return
1939
showard9bb960b2009-11-19 01:02:11 +00001940 queue_entry = models.HostQueueEntry.objects.get(
1941 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001942 else:
1943 queue_entry = None
1944
1945 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001946 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001947 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001948 queue_entry=queue_entry,
1949 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001950
showard8fe93b52008-11-18 17:53:22 +00001951
1952class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001953 TASK_TYPE = models.SpecialTask.Task.VERIFY
1954
1955
showardd1195652009-12-08 22:21:02 +00001956 def __init__(self, task):
1957 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001958 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001959
1960
jadmanski0afbb632008-06-06 21:10:57 +00001961 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001962 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001963
showardb18134f2009-03-20 20:52:18 +00001964 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001965 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001966 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1967 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001968
showarded2afea2009-07-07 20:54:07 +00001969 # Delete any other queued verifies for this host. One verify will do
1970 # and there's no need to keep records of other requests.
1971 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001972 host__id=self.host.id,
1973 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001974 is_active=False, is_complete=False)
1975 queued_verifies = queued_verifies.exclude(id=self.task.id)
1976 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001977
mbligh36768f02008-02-22 18:28:33 +00001978
jadmanski0afbb632008-06-06 21:10:57 +00001979 def epilog(self):
1980 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001981 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001982 if self.queue_entry:
1983 self.queue_entry.on_pending()
1984 else:
1985 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001986
1987
mbligh4608b002010-01-05 18:22:35 +00001988class CleanupTask(PreJobTask):
1989 # note this can also run post-job, but when it does, it's running standalone
1990 # against the host (not related to the job), so it's not considered a
1991 # PostJobTask
1992
1993 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1994
1995
1996 def __init__(self, task, recover_run_monitor=None):
1997 super(CleanupTask, self).__init__(task, ['--cleanup'])
1998 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1999
2000
2001 def prolog(self):
2002 super(CleanupTask, self).prolog()
2003 logging.info("starting cleanup task for host: %s", self.host.hostname)
2004 self.host.set_status(models.Host.Status.CLEANING)
2005 if self.queue_entry:
2006 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2007
2008
2009 def _finish_epilog(self):
2010 if not self.queue_entry or not self.success:
2011 return
2012
2013 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2014 should_run_verify = (
2015 self.queue_entry.job.run_verify
2016 and self.host.protection != do_not_verify_protection)
2017 if should_run_verify:
2018 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2019 models.SpecialTask.objects.create(
2020 host=models.Host.objects.get(id=self.host.id),
2021 queue_entry=entry,
2022 task=models.SpecialTask.Task.VERIFY)
2023 else:
2024 self.queue_entry.on_pending()
2025
2026
2027 def epilog(self):
2028 super(CleanupTask, self).epilog()
2029
2030 if self.success:
2031 self.host.update_field('dirty', 0)
2032 self.host.set_status(models.Host.Status.READY)
2033
2034 self._finish_epilog()
2035
2036
showarda9545c02009-12-18 22:44:26 +00002037class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2038 """
2039 Common functionality for QueueTask and HostlessQueueTask
2040 """
2041 def __init__(self, queue_entries):
2042 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002043 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002044 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002045
2046
showard73ec0442009-02-07 02:05:20 +00002047 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002048 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002049
2050
jamesrenc44ae992010-02-19 00:12:54 +00002051 def _write_control_file(self, execution_path):
2052 control_path = _drone_manager.attach_file_to_execution(
2053 execution_path, self.job.control_file)
2054 return control_path
2055
2056
showardd1195652009-12-08 22:21:02 +00002057 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002058 execution_path = self.queue_entries[0].execution_path()
2059 control_path = self._write_control_file(execution_path)
2060 hostnames = ','.join(entry.host.hostname
2061 for entry in self.queue_entries
2062 if not entry.is_hostless())
2063
2064 execution_tag = self.queue_entries[0].execution_tag()
2065 params = _autoserv_command_line(
2066 hostnames,
2067 ['-P', execution_tag, '-n',
2068 _drone_manager.absolute_path(control_path)],
2069 job=self.job, verbose=False)
2070
2071 if not self.job.is_server_job():
2072 params.append('-c')
2073
2074 return params
showardd1195652009-12-08 22:21:02 +00002075
2076
2077 @property
2078 def num_processes(self):
2079 return len(self.queue_entries)
2080
2081
2082 @property
2083 def owner_username(self):
2084 return self.job.owner
2085
2086
2087 def _working_directory(self):
2088 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002092 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002093 keyval_dict = self.job.keyval_dict()
2094 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002095 group_name = self.queue_entries[0].get_group_name()
2096 if group_name:
2097 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002098 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002099 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002100 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002101 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002102
2103
showard35162b02009-03-03 02:17:30 +00002104 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002105 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002106 _drone_manager.write_lines_to_file(error_file_path,
2107 [_LOST_PROCESS_ERROR])
2108
2109
showardd3dc1992009-04-22 21:01:40 +00002110 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002111 if not self.monitor:
2112 return
2113
showardd9205182009-04-27 20:09:55 +00002114 self._write_job_finished()
2115
showard35162b02009-03-03 02:17:30 +00002116 if self.monitor.lost_process:
2117 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002118
jadmanskif7fa2cc2008-10-01 14:13:23 +00002119
showardcbd74612008-11-19 21:42:02 +00002120 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002121 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002122 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002123 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002124 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002125
2126
jadmanskif7fa2cc2008-10-01 14:13:23 +00002127 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002128 if not self.monitor or not self.monitor.has_process():
2129 return
2130
jadmanskif7fa2cc2008-10-01 14:13:23 +00002131 # build up sets of all the aborted_by and aborted_on values
2132 aborted_by, aborted_on = set(), set()
2133 for queue_entry in self.queue_entries:
2134 if queue_entry.aborted_by:
2135 aborted_by.add(queue_entry.aborted_by)
2136 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2137 aborted_on.add(t)
2138
2139 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002140 # TODO(showard): this conditional is now obsolete, we just need to leave
2141 # it in temporarily for backwards compatibility over upgrades. delete
2142 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002143 assert len(aborted_by) <= 1
2144 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002145 aborted_by_value = aborted_by.pop()
2146 aborted_on_value = max(aborted_on)
2147 else:
2148 aborted_by_value = 'autotest_system'
2149 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002150
showarda0382352009-02-11 23:36:43 +00002151 self._write_keyval_after_job("aborted_by", aborted_by_value)
2152 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002153
showardcbd74612008-11-19 21:42:02 +00002154 aborted_on_string = str(datetime.datetime.fromtimestamp(
2155 aborted_on_value))
2156 self._write_status_comment('Job aborted by %s on %s' %
2157 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002158
2159
jadmanski0afbb632008-06-06 21:10:57 +00002160 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002161 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002162 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002163 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002164
2165
jadmanski0afbb632008-06-06 21:10:57 +00002166 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002167 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002168 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002169
2170
2171class QueueTask(AbstractQueueTask):
2172 def __init__(self, queue_entries):
2173 super(QueueTask, self).__init__(queue_entries)
2174 self._set_ids(queue_entries=queue_entries)
2175
2176
2177 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002178 self._check_queue_entry_statuses(
2179 self.queue_entries,
2180 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2181 models.HostQueueEntry.Status.RUNNING),
2182 allowed_host_statuses=(models.Host.Status.PENDING,
2183 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002184
2185 super(QueueTask, self).prolog()
2186
2187 for queue_entry in self.queue_entries:
2188 self._write_host_keyvals(queue_entry.host)
2189 queue_entry.host.set_status(models.Host.Status.RUNNING)
2190 queue_entry.host.update_field('dirty', 1)
2191 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2192 # TODO(gps): Remove this if nothing needs it anymore.
2193 # A potential user is: tko/parser
2194 self.job.write_to_machines_file(self.queue_entries[0])
2195
2196
2197 def _finish_task(self):
2198 super(QueueTask, self)._finish_task()
2199
2200 for queue_entry in self.queue_entries:
2201 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002202
2203
mbligh4608b002010-01-05 18:22:35 +00002204class HostlessQueueTask(AbstractQueueTask):
2205 def __init__(self, queue_entry):
2206 super(HostlessQueueTask, self).__init__([queue_entry])
2207 self.queue_entry_ids = [queue_entry.id]
2208
2209
2210 def prolog(self):
2211 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2212 super(HostlessQueueTask, self).prolog()
2213
2214
mbligh4608b002010-01-05 18:22:35 +00002215 def _finish_task(self):
2216 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002217 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002218
2219
showardd3dc1992009-04-22 21:01:40 +00002220class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002221 def __init__(self, queue_entries, log_file_name):
2222 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002223
showardd1195652009-12-08 22:21:02 +00002224 self.queue_entries = queue_entries
2225
showardd3dc1992009-04-22 21:01:40 +00002226 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002227 self._autoserv_monitor.attach_to_existing_process(
2228 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002229
showardd1195652009-12-08 22:21:02 +00002230
2231 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002232 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002233 return 'true'
2234 return self._generate_command(
2235 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002236
2237
2238 def _generate_command(self, results_dir):
2239 raise NotImplementedError('Subclasses must override this')
2240
2241
showardd1195652009-12-08 22:21:02 +00002242 @property
2243 def owner_username(self):
2244 return self.queue_entries[0].job.owner
2245
2246
2247 def _working_directory(self):
2248 return self._get_consistent_execution_path(self.queue_entries)
2249
2250
2251 def _paired_with_monitor(self):
2252 return self._autoserv_monitor
2253
2254
showardd3dc1992009-04-22 21:01:40 +00002255 def _job_was_aborted(self):
2256 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002257 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002258 queue_entry.update_from_database()
2259 if was_aborted is None: # first queue entry
2260 was_aborted = bool(queue_entry.aborted)
2261 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2262 email_manager.manager.enqueue_notify_email(
2263 'Inconsistent abort state',
2264 'Queue entries have inconsistent abort state: ' +
2265 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2266 # don't crash here, just assume true
2267 return True
2268 return was_aborted
2269
2270
showardd1195652009-12-08 22:21:02 +00002271 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002272 if self._job_was_aborted():
2273 return models.HostQueueEntry.Status.ABORTED
2274
2275 # we'll use a PidfileRunMonitor to read the autoserv exit status
2276 if self._autoserv_monitor.exit_code() == 0:
2277 return models.HostQueueEntry.Status.COMPLETED
2278 return models.HostQueueEntry.Status.FAILED
2279
2280
showardd3dc1992009-04-22 21:01:40 +00002281 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002282 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002283 queue_entry.set_status(status)
2284
2285
2286 def abort(self):
2287 # override AgentTask.abort() to avoid killing the process and ending
2288 # the task. post-job tasks continue when the job is aborted.
2289 pass
2290
2291
mbligh4608b002010-01-05 18:22:35 +00002292 def _pidfile_label(self):
2293 # '.autoserv_execute' -> 'autoserv'
2294 return self._pidfile_name()[1:-len('_execute')]
2295
2296
showard9bb960b2009-11-19 01:02:11 +00002297class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002298 """
2299 Task responsible for
2300 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2301 * copying logs to the results repository
2302 * spawning CleanupTasks for hosts, if necessary
2303 * spawning a FinalReparseTask for the job
2304 """
showardd1195652009-12-08 22:21:02 +00002305 def __init__(self, queue_entries, recover_run_monitor=None):
2306 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002307 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002308 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002309 self._set_ids(queue_entries=queue_entries)
2310
2311
2312 def _generate_command(self, results_dir):
2313 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002314 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002315 return [_autoserv_path , '-p',
2316 '--pidfile-label=%s' % self._pidfile_label(),
2317 '--use-existing-results', '--collect-crashinfo',
2318 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002319
2320
showardd1195652009-12-08 22:21:02 +00002321 @property
2322 def num_processes(self):
2323 return len(self.queue_entries)
2324
2325
2326 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002327 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002328
2329
showardd3dc1992009-04-22 21:01:40 +00002330 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002331 self._check_queue_entry_statuses(
2332 self.queue_entries,
2333 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2334 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002335
showardd3dc1992009-04-22 21:01:40 +00002336 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002337
2338
showardd3dc1992009-04-22 21:01:40 +00002339 def epilog(self):
2340 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002341 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002342 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002343
showard9bb960b2009-11-19 01:02:11 +00002344
2345 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002346 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002347 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002348 models.HostQueueEntry.Status.COMPLETED)
2349 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2350 else:
2351 final_success = False
2352 num_tests_failed = 0
2353
showard9bb960b2009-11-19 01:02:11 +00002354 reboot_after = self._job.reboot_after
2355 do_reboot = (
2356 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002357 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002358 or reboot_after == model_attributes.RebootAfter.ALWAYS
2359 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002360 and final_success and num_tests_failed == 0))
2361
showardd1195652009-12-08 22:21:02 +00002362 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002363 if do_reboot:
2364 # don't pass the queue entry to the CleanupTask. if the cleanup
2365 # fails, the job doesn't care -- it's over.
2366 models.SpecialTask.objects.create(
2367 host=models.Host.objects.get(id=queue_entry.host.id),
2368 task=models.SpecialTask.Task.CLEANUP,
2369 requested_by=self._job.owner_model())
2370 else:
2371 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002372
2373
showard0bbfc212009-04-29 21:06:13 +00002374 def run(self):
showard597bfd32009-05-08 18:22:50 +00002375 autoserv_exit_code = self._autoserv_monitor.exit_code()
2376 # only run if Autoserv exited due to some signal. if we have no exit
2377 # code, assume something bad (and signal-like) happened.
2378 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002379 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002380 else:
2381 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002382
2383
mbligh4608b002010-01-05 18:22:35 +00002384class SelfThrottledPostJobTask(PostJobTask):
2385 """
2386 Special AgentTask subclass that maintains its own global process limit.
2387 """
2388 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002389
2390
mbligh4608b002010-01-05 18:22:35 +00002391 @classmethod
2392 def _increment_running_processes(cls):
2393 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002394
mblighd5c95802008-03-05 00:33:46 +00002395
mbligh4608b002010-01-05 18:22:35 +00002396 @classmethod
2397 def _decrement_running_processes(cls):
2398 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002399
2400
mbligh4608b002010-01-05 18:22:35 +00002401 @classmethod
2402 def _max_processes(cls):
2403 raise NotImplementedError
2404
2405
2406 @classmethod
2407 def _can_run_new_process(cls):
2408 return cls._num_running_processes < cls._max_processes()
2409
2410
2411 def _process_started(self):
2412 return bool(self.monitor)
2413
2414
2415 def tick(self):
2416 # override tick to keep trying to start until the process count goes
2417 # down and we can, at which point we revert to default behavior
2418 if self._process_started():
2419 super(SelfThrottledPostJobTask, self).tick()
2420 else:
2421 self._try_starting_process()
2422
2423
2424 def run(self):
2425 # override run() to not actually run unless we can
2426 self._try_starting_process()
2427
2428
2429 def _try_starting_process(self):
2430 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002431 return
2432
mbligh4608b002010-01-05 18:22:35 +00002433 # actually run the command
2434 super(SelfThrottledPostJobTask, self).run()
2435 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002436
mblighd5c95802008-03-05 00:33:46 +00002437
mbligh4608b002010-01-05 18:22:35 +00002438 def finished(self, success):
2439 super(SelfThrottledPostJobTask, self).finished(success)
2440 if self._process_started():
2441 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002442
showard21baa452008-10-21 00:08:39 +00002443
mbligh4608b002010-01-05 18:22:35 +00002444class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002445 def __init__(self, queue_entries):
2446 super(FinalReparseTask, self).__init__(queue_entries,
2447 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002448 # don't use _set_ids, since we don't want to set the host_ids
2449 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002450
2451
2452 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002453 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002454 results_dir]
2455
2456
2457 @property
2458 def num_processes(self):
2459 return 0 # don't include parser processes in accounting
2460
2461
2462 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002463 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002464
2465
showard97aed502008-11-04 02:01:24 +00002466 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002467 def _max_processes(cls):
2468 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002469
2470
2471 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002472 self._check_queue_entry_statuses(
2473 self.queue_entries,
2474 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002475
showard97aed502008-11-04 02:01:24 +00002476 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002477
2478
2479 def epilog(self):
2480 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002481 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002482
2483
mbligh4608b002010-01-05 18:22:35 +00002484class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002485 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2486
mbligh4608b002010-01-05 18:22:35 +00002487 def __init__(self, queue_entries):
2488 super(ArchiveResultsTask, self).__init__(queue_entries,
2489 log_file_name='.archiving.log')
2490 # don't use _set_ids, since we don't want to set the host_ids
2491 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002492
2493
mbligh4608b002010-01-05 18:22:35 +00002494 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002495 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002496
2497
mbligh4608b002010-01-05 18:22:35 +00002498 def _generate_command(self, results_dir):
2499 return [_autoserv_path , '-p',
2500 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002501 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002502 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2503 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002504
2505
mbligh4608b002010-01-05 18:22:35 +00002506 @classmethod
2507 def _max_processes(cls):
2508 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002509
2510
2511 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002512 self._check_queue_entry_statuses(
2513 self.queue_entries,
2514 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2515
2516 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002517
2518
mbligh4608b002010-01-05 18:22:35 +00002519 def epilog(self):
2520 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002521 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002522 failed_file = os.path.join(self._working_directory(),
2523 self._ARCHIVING_FAILED_FILE)
2524 paired_process = self._paired_with_monitor().get_process()
2525 _drone_manager.write_lines_to_file(
2526 failed_file, ['Archiving failed with exit code %s'
2527 % self.monitor.exit_code()],
2528 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002529 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002530
2531
mbligh36768f02008-02-22 18:28:33 +00002532if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002533 main()