blob: 13b42d2f798c29a8a90f0e6fc691179f284a6697 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
mbligh36768f02008-02-22 18:28:33 +000077def main():
showard27f33872009-04-07 18:20:53 +000078 try:
showard549afad2009-08-20 23:33:36 +000079 try:
80 main_without_exception_handling()
81 except SystemExit:
82 raise
83 except:
84 logging.exception('Exception escaping in monitor_db')
85 raise
86 finally:
87 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000088
89
90def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000091 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000092
showard136e6dc2009-06-10 19:38:49 +000093 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000094 parser = optparse.OptionParser(usage)
95 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
96 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000097 parser.add_option('--test', help='Indicate that scheduler is under ' +
98 'test and should use dummy autoserv and no parsing',
99 action='store_true')
100 (options, args) = parser.parse_args()
101 if len(args) != 1:
102 parser.print_usage()
103 return
mbligh36768f02008-02-22 18:28:33 +0000104
showard5613c662009-06-08 23:30:33 +0000105 scheduler_enabled = global_config.global_config.get_config_value(
106 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
107
108 if not scheduler_enabled:
109 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
110 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000111 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000112 sys.exit(1)
113
jadmanski0afbb632008-06-06 21:10:57 +0000114 global RESULTS_DIR
115 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000116
mbligh83c1e9e2009-05-01 23:10:41 +0000117 site_init = utils.import_site_function(__file__,
118 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
119 _site_init_monitor_db_dummy)
120 site_init()
121
showardcca334f2009-03-12 20:38:34 +0000122 # Change the cwd while running to avoid issues incase we were launched from
123 # somewhere odd (such as a random NFS home directory of the person running
124 # sudo to launch us as the appropriate user).
125 os.chdir(RESULTS_DIR)
126
showardc85c21b2008-11-24 22:17:37 +0000127
jadmanski0afbb632008-06-06 21:10:57 +0000128 if options.test:
129 global _autoserv_path
130 _autoserv_path = 'autoserv_dummy'
131 global _testing_mode
132 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000133
jamesrenc44ae992010-02-19 00:12:54 +0000134 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000135 server.start()
136
jadmanski0afbb632008-06-06 21:10:57 +0000137 try:
jamesrenc44ae992010-02-19 00:12:54 +0000138 initialize()
showardc5afc462009-01-13 00:09:39 +0000139 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000140 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 while not _shutdown:
143 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000144 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000145 except:
showard170873e2009-01-07 00:22:26 +0000146 email_manager.manager.log_stacktrace(
147 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000148
showard170873e2009-01-07 00:22:26 +0000149 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000150 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000151 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000152 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000153
154
showard136e6dc2009-06-10 19:38:49 +0000155def setup_logging():
156 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
157 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
158 logging_manager.configure_logging(
159 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
160 logfile_name=log_name)
161
162
mbligh36768f02008-02-22 18:28:33 +0000163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
jamesrenc44ae992010-02-19 00:12:54 +0000169def initialize():
showardb18134f2009-03-20 20:52:18 +0000170 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
171 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000172
showard8de37132009-08-31 18:33:08 +0000173 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000174 logging.critical("monitor_db already running, aborting!")
175 sys.exit(1)
176 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000177
showardb1e51872008-10-07 11:08:18 +0000178 if _testing_mode:
179 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000180 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000181
jadmanski0afbb632008-06-06 21:10:57 +0000182 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
183 global _db
showard170873e2009-01-07 00:22:26 +0000184 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000185 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000186
showardfa8629c2008-11-04 16:51:23 +0000187 # ensure Django connection is in autocommit
188 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000189 # bypass the readonly connection
190 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000193 signal.signal(signal.SIGINT, handle_sigint)
194
jamesrenc44ae992010-02-19 00:12:54 +0000195 initialize_globals()
196 scheduler_models.initialize()
197
showardd1ee1dd2009-01-07 21:33:08 +0000198 drones = global_config.global_config.get_config_value(
199 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
200 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000201 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000202 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000203 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
204
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000206
207
jamesrenc44ae992010-02-19 00:12:54 +0000208def initialize_globals():
209 global _drone_manager
210 _drone_manager = drone_manager.instance()
211
212
showarded2afea2009-07-07 20:54:07 +0000213def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
214 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000215 """
216 @returns The autoserv command line as a list of executable + parameters.
217
218 @param machines - string - A machine or comma separated list of machines
219 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000220 @param extra_args - list - Additional arguments to pass to autoserv.
221 @param job - Job object - If supplied, -u owner and -l name parameters
222 will be added.
223 @param queue_entry - A HostQueueEntry object - If supplied and no Job
224 object was supplied, this will be used to lookup the Job object.
225 """
showarda9545c02009-12-18 22:44:26 +0000226 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000227 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000228 if machines:
229 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000234 if verbose:
235 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000236 return autoserv_argv + extra_args
237
238
showard89f84db2009-03-12 20:39:13 +0000239class SchedulerError(Exception):
240 """Raised by HostScheduler when an inconsistent state occurs."""
241
242
jamesren883492a2010-02-12 00:45:18 +0000243class HostScheduler(metahost_scheduler.HostSchedulingUtility):
244 """Handles the logic for choosing when to run jobs and on which hosts.
245
246 This class makes several queries to the database on each tick, building up
247 some auxiliary data structures and using them to determine which hosts are
248 eligible to run which jobs, taking into account all the various factors that
249 affect that.
250
251 In the past this was done with one or two very large, complex database
252 queries. It has proven much simpler and faster to build these auxiliary
253 data structures and perform the logic in Python.
254 """
255 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000256 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
257
258 # load site-specific scheduler selected in global_config
259 site_schedulers_str = global_config.global_config.get_config_value(
260 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
261 default='')
262 site_schedulers = set(site_schedulers_str.split(','))
263 for scheduler in get_site_metahost_schedulers():
264 if type(scheduler).__name__ in site_schedulers:
265 # always prepend, so site schedulers take precedence
266 self._metahost_schedulers = (
267 [scheduler] + self._metahost_schedulers)
268 logging.info('Metahost schedulers: %s',
269 ', '.join(type(scheduler).__name__ for scheduler
270 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000271
272
showard63a34772008-08-18 19:32:50 +0000273 def _get_ready_hosts(self):
274 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000275 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000276 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
277 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000278 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000279 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000280 "AND NOT afe_hosts.locked "
281 "AND (afe_hosts.status IS NULL "
282 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000283 return dict((host.id, host) for host in hosts)
284
285
286 @staticmethod
287 def _get_sql_id_list(id_list):
288 return ','.join(str(item_id) for item_id in id_list)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000293 if not id_list:
294 return {}
showard63a34772008-08-18 19:32:50 +0000295 query %= cls._get_sql_id_list(id_list)
296 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000297 return cls._process_many2many_dict(rows, flip)
298
299
300 @staticmethod
301 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000302 result = {}
303 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000304 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000305 if flip:
306 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000307 result.setdefault(left_id, set()).add(right_id)
308 return result
309
310
311 @classmethod
312 def _get_job_acl_groups(cls, job_ids):
313 query = """
showardeab66ce2009-12-23 00:03:56 +0000314 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
315 FROM afe_jobs
316 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
317 INNER JOIN afe_acl_groups_users ON
318 afe_acl_groups_users.user_id = afe_users.id
319 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
325 def _get_job_ineligible_hosts(cls, job_ids):
326 query = """
327 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000328 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard989f25d2008-10-01 11:38:11 +0000335 def _get_job_dependencies(cls, job_ids):
336 query = """
337 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000338 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000339 WHERE job_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, job_ids)
342
343
344 @classmethod
showard63a34772008-08-18 19:32:50 +0000345 def _get_host_acls(cls, host_ids):
346 query = """
showardd9ac4452009-02-07 02:04:37 +0000347 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000348 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000349 WHERE host_id IN (%s)
350 """
351 return cls._get_many2many_dict(query, host_ids)
352
353
354 @classmethod
355 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000356 if not host_ids:
357 return {}, {}
showard63a34772008-08-18 19:32:50 +0000358 query = """
359 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000360 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000361 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000362 """ % cls._get_sql_id_list(host_ids)
363 rows = _db.execute(query)
364 labels_to_hosts = cls._process_many2many_dict(rows)
365 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
366 return labels_to_hosts, hosts_to_labels
367
368
369 @classmethod
370 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000371 return dict((label.id, label) for label
372 in scheduler_models.Label.fetch())
373
374
375 def recovery_on_startup(self):
376 for metahost_scheduler in self._metahost_schedulers:
377 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def refresh(self, pending_queue_entries):
381 self._hosts_available = self._get_ready_hosts()
382
383 relevant_jobs = [queue_entry.job_id
384 for queue_entry in pending_queue_entries]
385 self._job_acls = self._get_job_acl_groups(relevant_jobs)
386 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000387 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000388
389 host_ids = self._hosts_available.keys()
390 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000391 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
392
393 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000394
jamesrene21bf412010-02-26 02:30:07 +0000395
396 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000397 for metahost_scheduler in self._metahost_schedulers:
398 metahost_scheduler.tick()
399
showard63a34772008-08-18 19:32:50 +0000400
jamesren883492a2010-02-12 00:45:18 +0000401 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000402 return set(self._label_hosts.get(label_id, ()))
403
404
405 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000406 self._label_hosts[label_id].remove(host_id)
407
408
409 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000410 return self._hosts_available.pop(host_id)
411
412
413 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000414 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
415
416
showard63a34772008-08-18 19:32:50 +0000417 def _is_acl_accessible(self, host_id, queue_entry):
418 job_acls = self._job_acls.get(queue_entry.job_id, set())
419 host_acls = self._host_acls.get(host_id, set())
420 return len(host_acls.intersection(job_acls)) > 0
421
422
showard989f25d2008-10-01 11:38:11 +0000423 def _check_job_dependencies(self, job_dependencies, host_labels):
424 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000425 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000426
427
428 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
429 queue_entry):
showardade14e22009-01-26 22:38:32 +0000430 if not queue_entry.meta_host:
431 # bypass only_if_needed labels when a specific host is selected
432 return True
433
showard989f25d2008-10-01 11:38:11 +0000434 for label_id in host_labels:
435 label = self._labels[label_id]
436 if not label.only_if_needed:
437 # we don't care about non-only_if_needed labels
438 continue
439 if queue_entry.meta_host == label_id:
440 # if the label was requested in a metahost it's OK
441 continue
442 if label_id not in job_dependencies:
443 return False
444 return True
445
446
showard89f84db2009-03-12 20:39:13 +0000447 def _check_atomic_group_labels(self, host_labels, queue_entry):
448 """
449 Determine if the given HostQueueEntry's atomic group settings are okay
450 to schedule on a host with the given labels.
451
showard6157c632009-07-06 20:19:31 +0000452 @param host_labels: A list of label ids that the host has.
453 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000454
455 @returns True if atomic group settings are okay, False otherwise.
456 """
showard6157c632009-07-06 20:19:31 +0000457 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000458 queue_entry.atomic_group_id)
459
460
showard6157c632009-07-06 20:19:31 +0000461 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000462 """
463 Return the atomic group label id for a host with the given set of
464 labels if any, or None otherwise. Raises an exception if more than
465 one atomic group are found in the set of labels.
466
showard6157c632009-07-06 20:19:31 +0000467 @param host_labels: A list of label ids that the host has.
468 @param queue_entry: The HostQueueEntry we're testing. Only used for
469 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000470
471 @returns The id of the atomic group found on a label in host_labels
472 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000473 """
showard6157c632009-07-06 20:19:31 +0000474 atomic_labels = [self._labels[label_id] for label_id in host_labels
475 if self._labels[label_id].atomic_group_id is not None]
476 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000477 if not atomic_ids:
478 return None
479 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000480 logging.error('More than one Atomic Group on HQE "%s" via: %r',
481 queue_entry, atomic_labels)
482 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000483
484
485 def _get_atomic_group_labels(self, atomic_group_id):
486 """
487 Lookup the label ids that an atomic_group is associated with.
488
489 @param atomic_group_id - The id of the AtomicGroup to look up.
490
491 @returns A generator yeilding Label ids for this atomic group.
492 """
493 return (id for id, label in self._labels.iteritems()
494 if label.atomic_group_id == atomic_group_id
495 and not label.invalid)
496
497
showard54c1ea92009-05-20 00:32:58 +0000498 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000499 """
500 @param group_hosts - A sequence of Host ids to test for usability
501 and eligibility against the Job associated with queue_entry.
502 @param queue_entry - The HostQueueEntry that these hosts are being
503 tested for eligibility against.
504
505 @returns A subset of group_hosts Host ids that are eligible for the
506 supplied queue_entry.
507 """
508 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000509 if self.is_host_usable(host_id)
510 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000511
512
jamesren883492a2010-02-12 00:45:18 +0000513 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000514 if self._is_host_invalid(host_id):
515 # if an invalid host is scheduled for a job, it's a one-time host
516 # and it therefore bypasses eligibility checks. note this can only
517 # happen for non-metahosts, because invalid hosts have their label
518 # relationships cleared.
519 return True
520
showard989f25d2008-10-01 11:38:11 +0000521 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
522 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000523
showard89f84db2009-03-12 20:39:13 +0000524 return (self._is_acl_accessible(host_id, queue_entry) and
525 self._check_job_dependencies(job_dependencies, host_labels) and
526 self._check_only_if_needed_labels(
527 job_dependencies, host_labels, queue_entry) and
528 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000529
530
showard2924b0a2009-06-18 23:16:15 +0000531 def _is_host_invalid(self, host_id):
532 host_object = self._hosts_available.get(host_id, None)
533 return host_object and host_object.invalid
534
535
showard63a34772008-08-18 19:32:50 +0000536 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000537 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000538 return None
539 return self._hosts_available.pop(queue_entry.host_id, None)
540
541
jamesren883492a2010-02-12 00:45:18 +0000542 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000543 if host_id not in self._hosts_available:
544 # host was already used during this scheduling cycle
545 return False
546 if self._hosts_available[host_id].invalid:
547 # Invalid hosts cannot be used for metahosts. They're included in
548 # the original query because they can be used by non-metahosts.
549 return False
550 return True
551
552
jamesren883492a2010-02-12 00:45:18 +0000553 def schedule_entry(self, queue_entry):
554 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000555 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000556
557 for scheduler in self._metahost_schedulers:
558 if scheduler.can_schedule_metahost(queue_entry):
559 scheduler.schedule_metahost(queue_entry, self)
560 return None
561
562 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000563
564
showard89f84db2009-03-12 20:39:13 +0000565 def find_eligible_atomic_group(self, queue_entry):
566 """
567 Given an atomic group host queue entry, locate an appropriate group
568 of hosts for the associated job to run on.
569
570 The caller is responsible for creating new HQEs for the additional
571 hosts returned in order to run the actual job on them.
572
573 @returns A list of Host instances in a ready state to satisfy this
574 atomic group scheduling. Hosts will all belong to the same
575 atomic group label as specified by the queue_entry.
576 An empty list will be returned if no suitable atomic
577 group could be found.
578
579 TODO(gps): what is responsible for kicking off any attempted repairs on
580 a group of hosts? not this function, but something needs to. We do
581 not communicate that reason for returning [] outside of here...
582 For now, we'll just be unschedulable if enough hosts within one group
583 enter Repair Failed state.
584 """
585 assert queue_entry.atomic_group_id is not None
586 job = queue_entry.job
587 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000588 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000589 if job.synch_count > atomic_group.max_number_of_machines:
590 # Such a Job and HostQueueEntry should never be possible to
591 # create using the frontend. Regardless, we can't process it.
592 # Abort it immediately and log an error on the scheduler.
593 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000594 logging.error(
595 'Error: job %d synch_count=%d > requested atomic_group %d '
596 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
597 job.id, job.synch_count, atomic_group.id,
598 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000599 return []
jamesren883492a2010-02-12 00:45:18 +0000600 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
601 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000602
603 # Look in each label associated with atomic_group until we find one with
604 # enough hosts to satisfy the job.
605 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000606 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000607 if queue_entry.meta_host is not None:
608 # If we have a metahost label, only allow its hosts.
609 group_hosts.intersection_update(hosts_in_label)
610 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000611 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000612 group_hosts, queue_entry)
613
614 # Job.synch_count is treated as "minimum synch count" when
615 # scheduling for an atomic group of hosts. The atomic group
616 # number of machines is the maximum to pick out of a single
617 # atomic group label for scheduling at one time.
618 min_hosts = job.synch_count
619 max_hosts = atomic_group.max_number_of_machines
620
showard54c1ea92009-05-20 00:32:58 +0000621 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000622 # Not enough eligible hosts in this atomic group label.
623 continue
624
showard54c1ea92009-05-20 00:32:58 +0000625 eligible_hosts_in_group = [self._hosts_available[id]
626 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000627 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000628 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000629
showard89f84db2009-03-12 20:39:13 +0000630 # Limit ourselves to scheduling the atomic group size.
631 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000632 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000633
634 # Remove the selected hosts from our cached internal state
635 # of available hosts in order to return the Host objects.
636 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000637 for host in eligible_hosts_in_group:
638 hosts_in_label.discard(host.id)
639 self._hosts_available.pop(host.id)
640 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000641 return host_list
642
643 return []
644
645
showard170873e2009-01-07 00:22:26 +0000646class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000647 def __init__(self):
648 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000649 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000650 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000651 user_cleanup_time = scheduler_config.config.clean_interval
652 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
653 _db, user_cleanup_time)
654 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000655 self._host_agents = {}
656 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000657 self._tick_count = 0
658 self._last_garbage_stats_time = time.time()
659 self._seconds_between_garbage_stats = 60 * (
660 global_config.global_config.get_config_value(
661 scheduler_config.CONFIG_SECTION,
662 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000663
mbligh36768f02008-02-22 18:28:33 +0000664
showard915958d2009-04-22 21:00:58 +0000665 def initialize(self, recover_hosts=True):
666 self._periodic_cleanup.initialize()
667 self._24hr_upkeep.initialize()
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 # always recover processes
670 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000671
jadmanski0afbb632008-06-06 21:10:57 +0000672 if recover_hosts:
673 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000674
jamesrenc44ae992010-02-19 00:12:54 +0000675 self._host_scheduler.recovery_on_startup()
676
mbligh36768f02008-02-22 18:28:33 +0000677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000679 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000680 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000681 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000682 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000683 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000684 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000685 self._schedule_running_host_queue_entries()
686 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000687 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000688 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000689 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000690 _drone_manager.execute_actions()
691 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000692 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000693 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000694
showard97aed502008-11-04 02:01:24 +0000695
mblighf3294cc2009-04-08 21:17:38 +0000696 def _run_cleanup(self):
697 self._periodic_cleanup.run_cleanup_maybe()
698 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000699
mbligh36768f02008-02-22 18:28:33 +0000700
showardf13a9e22009-12-18 22:54:09 +0000701 def _garbage_collection(self):
702 threshold_time = time.time() - self._seconds_between_garbage_stats
703 if threshold_time < self._last_garbage_stats_time:
704 # Don't generate these reports very often.
705 return
706
707 self._last_garbage_stats_time = time.time()
708 # Force a full level 0 collection (because we can, it doesn't hurt
709 # at this interval).
710 gc.collect()
711 logging.info('Logging garbage collector stats on tick %d.',
712 self._tick_count)
713 gc_stats._log_garbage_collector_stats()
714
715
showard170873e2009-01-07 00:22:26 +0000716 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
717 for object_id in object_ids:
718 agent_dict.setdefault(object_id, set()).add(agent)
719
720
721 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
722 for object_id in object_ids:
723 assert object_id in agent_dict
724 agent_dict[object_id].remove(agent)
725
726
showardd1195652009-12-08 22:21:02 +0000727 def add_agent_task(self, agent_task):
728 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000729 self._agents.append(agent)
730 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000731 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
732 self._register_agent_for_ids(self._queue_entry_agents,
733 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000734
showard170873e2009-01-07 00:22:26 +0000735
736 def get_agents_for_entry(self, queue_entry):
737 """
738 Find agents corresponding to the specified queue_entry.
739 """
showardd3dc1992009-04-22 21:01:40 +0000740 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000741
742
743 def host_has_agent(self, host):
744 """
745 Determine if there is currently an Agent present using this host.
746 """
747 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000748
749
jadmanski0afbb632008-06-06 21:10:57 +0000750 def remove_agent(self, agent):
751 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000752 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
753 agent)
754 self._unregister_agent_for_ids(self._queue_entry_agents,
755 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000756
757
showard8cc058f2009-09-08 16:26:33 +0000758 def _host_has_scheduled_special_task(self, host):
759 return bool(models.SpecialTask.objects.filter(host__id=host.id,
760 is_active=False,
761 is_complete=False))
762
763
jadmanski0afbb632008-06-06 21:10:57 +0000764 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000765 agent_tasks = self._create_recovery_agent_tasks()
766 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000767 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000768 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000769 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000770 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000771 self._reverify_remaining_hosts()
772 # reinitialize drones after killing orphaned processes, since they can
773 # leave around files when they die
774 _drone_manager.execute_actions()
775 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000776
showard170873e2009-01-07 00:22:26 +0000777
showardd1195652009-12-08 22:21:02 +0000778 def _create_recovery_agent_tasks(self):
779 return (self._get_queue_entry_agent_tasks()
780 + self._get_special_task_agent_tasks(is_active=True))
781
782
783 def _get_queue_entry_agent_tasks(self):
784 # host queue entry statuses handled directly by AgentTasks (Verifying is
785 # handled through SpecialTasks, so is not listed here)
786 statuses = (models.HostQueueEntry.Status.STARTING,
787 models.HostQueueEntry.Status.RUNNING,
788 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000789 models.HostQueueEntry.Status.PARSING,
790 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000791 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000792 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000793 where='status IN (%s)' % status_list)
794
795 agent_tasks = []
796 used_queue_entries = set()
797 for entry in queue_entries:
798 if self.get_agents_for_entry(entry):
799 # already being handled
800 continue
801 if entry in used_queue_entries:
802 # already picked up by a synchronous job
803 continue
804 agent_task = self._get_agent_task_for_queue_entry(entry)
805 agent_tasks.append(agent_task)
806 used_queue_entries.update(agent_task.queue_entries)
807 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000808
809
showardd1195652009-12-08 22:21:02 +0000810 def _get_special_task_agent_tasks(self, is_active=False):
811 special_tasks = models.SpecialTask.objects.filter(
812 is_active=is_active, is_complete=False)
813 return [self._get_agent_task_for_special_task(task)
814 for task in special_tasks]
815
816
817 def _get_agent_task_for_queue_entry(self, queue_entry):
818 """
819 Construct an AgentTask instance for the given active HostQueueEntry,
820 if one can currently run it.
821 @param queue_entry: a HostQueueEntry
822 @returns an AgentTask to run the queue entry
823 """
824 task_entries = queue_entry.job.get_group_entries(queue_entry)
825 self._check_for_duplicate_host_entries(task_entries)
826
827 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
828 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000829 if queue_entry.is_hostless():
830 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000831 return QueueTask(queue_entries=task_entries)
832 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
833 return GatherLogsTask(queue_entries=task_entries)
834 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
835 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000836 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
837 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000838
839 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000840 'invalid status %s: %s'
841 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000842
843
844 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000845 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
846 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000847 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000848 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000849 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000850 if using_host:
showardd1195652009-12-08 22:21:02 +0000851 self._assert_host_has_no_agent(task_entry)
852
853
854 def _assert_host_has_no_agent(self, entry):
855 """
856 @param entry: a HostQueueEntry or a SpecialTask
857 """
858 if self.host_has_agent(entry.host):
859 agent = tuple(self._host_agents.get(entry.host.id))[0]
860 raise SchedulerError(
861 'While scheduling %s, host %s already has a host agent %s'
862 % (entry, entry.host, agent.task))
863
864
865 def _get_agent_task_for_special_task(self, special_task):
866 """
867 Construct an AgentTask class to run the given SpecialTask and add it
868 to this dispatcher.
869 @param special_task: a models.SpecialTask instance
870 @returns an AgentTask to run this SpecialTask
871 """
872 self._assert_host_has_no_agent(special_task)
873
874 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
875 for agent_task_class in special_agent_task_classes:
876 if agent_task_class.TASK_TYPE == special_task.task:
877 return agent_task_class(task=special_task)
878
879 raise SchedulerError('No AgentTask class for task', str(special_task))
880
881
882 def _register_pidfiles(self, agent_tasks):
883 for agent_task in agent_tasks:
884 agent_task.register_necessary_pidfiles()
885
886
887 def _recover_tasks(self, agent_tasks):
888 orphans = _drone_manager.get_orphaned_autoserv_processes()
889
890 for agent_task in agent_tasks:
891 agent_task.recover()
892 if agent_task.monitor and agent_task.monitor.has_process():
893 orphans.discard(agent_task.monitor.get_process())
894 self.add_agent_task(agent_task)
895
896 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000897
898
showard8cc058f2009-09-08 16:26:33 +0000899 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000900 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
901 % status):
showard0db3d432009-10-12 20:29:15 +0000902 if entry.status == status and not self.get_agents_for_entry(entry):
903 # The status can change during iteration, e.g., if job.run()
904 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000905 yield entry
906
907
showard6878e8b2009-07-20 22:37:45 +0000908 def _check_for_remaining_orphan_processes(self, orphans):
909 if not orphans:
910 return
911 subject = 'Unrecovered orphan autoserv processes remain'
912 message = '\n'.join(str(process) for process in orphans)
913 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000914
915 die_on_orphans = global_config.global_config.get_config_value(
916 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
917
918 if die_on_orphans:
919 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000920
showard170873e2009-01-07 00:22:26 +0000921
showard8cc058f2009-09-08 16:26:33 +0000922 def _recover_pending_entries(self):
923 for entry in self._get_unassigned_entries(
924 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000925 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000926 entry.on_pending()
927
928
showardb8900452009-10-12 20:31:01 +0000929 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000930 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000931 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
932 unrecovered_hqes = []
933 for queue_entry in queue_entries:
934 special_tasks = models.SpecialTask.objects.filter(
935 task__in=(models.SpecialTask.Task.CLEANUP,
936 models.SpecialTask.Task.VERIFY),
937 queue_entry__id=queue_entry.id,
938 is_complete=False)
939 if special_tasks.count() == 0:
940 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000941
showardb8900452009-10-12 20:31:01 +0000942 if unrecovered_hqes:
943 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000944 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000945 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000946 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000947
948
showard65db3932009-10-28 19:54:35 +0000949 def _get_prioritized_special_tasks(self):
950 """
951 Returns all queued SpecialTasks prioritized for repair first, then
952 cleanup, then verify.
953 """
954 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
955 is_complete=False,
956 host__locked=False)
957 # exclude hosts with active queue entries unless the SpecialTask is for
958 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000959 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000960 queued_tasks, 'afe_host_queue_entries', 'host_id',
961 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000962 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000963 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000964 where=['(afe_host_queue_entries.id IS NULL OR '
965 'afe_host_queue_entries.id = '
966 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000967
showard65db3932009-10-28 19:54:35 +0000968 # reorder tasks by priority
969 task_priority_order = [models.SpecialTask.Task.REPAIR,
970 models.SpecialTask.Task.CLEANUP,
971 models.SpecialTask.Task.VERIFY]
972 def task_priority_key(task):
973 return task_priority_order.index(task.task)
974 return sorted(queued_tasks, key=task_priority_key)
975
976
showard65db3932009-10-28 19:54:35 +0000977 def _schedule_special_tasks(self):
978 """
979 Execute queued SpecialTasks that are ready to run on idle hosts.
980 """
981 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000982 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000983 continue
showardd1195652009-12-08 22:21:02 +0000984 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000985
986
showard170873e2009-01-07 00:22:26 +0000987 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000988 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000989 # should never happen
showarded2afea2009-07-07 20:54:07 +0000990 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000991 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000992 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000993 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000994 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000995
996
jadmanski0afbb632008-06-06 21:10:57 +0000997 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000998 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000999 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001000 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001001 if self.host_has_agent(host):
1002 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001003 continue
showard8cc058f2009-09-08 16:26:33 +00001004 if self._host_has_scheduled_special_task(host):
1005 # host will have a special task scheduled on the next cycle
1006 continue
showard170873e2009-01-07 00:22:26 +00001007 if print_message:
showardb18134f2009-03-20 20:52:18 +00001008 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001009 models.SpecialTask.objects.create(
1010 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001011 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001012
1013
jadmanski0afbb632008-06-06 21:10:57 +00001014 def _recover_hosts(self):
1015 # recover "Repair Failed" hosts
1016 message = 'Reverifying dead host %s'
1017 self._reverify_hosts_where("status = 'Repair Failed'",
1018 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001019
1020
showard04c82c52008-05-29 19:38:12 +00001021
showardb95b1bd2008-08-15 18:11:04 +00001022 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001023 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001024 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001025 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001026 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001027 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001028
1029
showard89f84db2009-03-12 20:39:13 +00001030 def _refresh_pending_queue_entries(self):
1031 """
1032 Lookup the pending HostQueueEntries and call our HostScheduler
1033 refresh() method given that list. Return the list.
1034
1035 @returns A list of pending HostQueueEntries sorted in priority order.
1036 """
showard63a34772008-08-18 19:32:50 +00001037 queue_entries = self._get_pending_queue_entries()
1038 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001039 return []
showardb95b1bd2008-08-15 18:11:04 +00001040
showard63a34772008-08-18 19:32:50 +00001041 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001042
showard89f84db2009-03-12 20:39:13 +00001043 return queue_entries
1044
1045
1046 def _schedule_atomic_group(self, queue_entry):
1047 """
1048 Schedule the given queue_entry on an atomic group of hosts.
1049
1050 Returns immediately if there are insufficient available hosts.
1051
1052 Creates new HostQueueEntries based off of queue_entry for the
1053 scheduled hosts and starts them all running.
1054 """
1055 # This is a virtual host queue entry representing an entire
1056 # atomic group, find a group and schedule their hosts.
1057 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1058 queue_entry)
1059 if not group_hosts:
1060 return
showardcbe6f942009-06-17 19:33:49 +00001061
1062 logging.info('Expanding atomic group entry %s with hosts %s',
1063 queue_entry,
1064 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001065
showard89f84db2009-03-12 20:39:13 +00001066 for assigned_host in group_hosts[1:]:
1067 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001068 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001069 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001070 new_hqe.set_host(assigned_host)
1071 self._run_queue_entry(new_hqe)
1072
1073 # The first assigned host uses the original HostQueueEntry
1074 queue_entry.set_host(group_hosts[0])
1075 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001076
1077
showarda9545c02009-12-18 22:44:26 +00001078 def _schedule_hostless_job(self, queue_entry):
1079 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001080 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001081
1082
showard89f84db2009-03-12 20:39:13 +00001083 def _schedule_new_jobs(self):
1084 queue_entries = self._refresh_pending_queue_entries()
1085 if not queue_entries:
1086 return
1087
showard63a34772008-08-18 19:32:50 +00001088 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001089 is_unassigned_atomic_group = (
1090 queue_entry.atomic_group_id is not None
1091 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001092
1093 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001094 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001095 elif is_unassigned_atomic_group:
1096 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001097 else:
jamesren883492a2010-02-12 00:45:18 +00001098 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001099 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001100 assert assigned_host.id == queue_entry.host_id
1101 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001102
1103
showard8cc058f2009-09-08 16:26:33 +00001104 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001105 for agent_task in self._get_queue_entry_agent_tasks():
1106 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001107
1108
1109 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001110 for entry in scheduler_models.HostQueueEntry.fetch(
1111 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001112 task = entry.job.schedule_delayed_callback_task(entry)
1113 if task:
showardd1195652009-12-08 22:21:02 +00001114 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001115
1116
jamesren883492a2010-02-12 00:45:18 +00001117 def _run_queue_entry(self, queue_entry):
1118 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001119
1120
jadmanski0afbb632008-06-06 21:10:57 +00001121 def _find_aborting(self):
jamesrenc44ae992010-02-19 00:12:54 +00001122 for entry in scheduler_models.HostQueueEntry.fetch(
1123 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001124 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001125 for agent in self.get_agents_for_entry(entry):
1126 agent.abort()
1127 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001128
1129
showard324bf812009-01-20 23:23:38 +00001130 def _can_start_agent(self, agent, num_started_this_cycle,
1131 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001132 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001133 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001134 return True
1135 # don't allow any nonzero-process agents to run after we've reached a
1136 # limit (this avoids starvation of many-process agents)
1137 if have_reached_limit:
1138 return False
1139 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001140 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001141 agent.task.owner_username)
1142 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001143 return False
1144 # if a single agent exceeds the per-cycle throttling, still allow it to
1145 # run when it's the first agent in the cycle
1146 if num_started_this_cycle == 0:
1147 return True
1148 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001149 if (num_started_this_cycle + agent.task.num_processes >
1150 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001151 return False
1152 return True
1153
1154
jadmanski0afbb632008-06-06 21:10:57 +00001155 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001156 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001157 have_reached_limit = False
1158 # iterate over copy, so we can remove agents during iteration
1159 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001160 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001161 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001162 have_reached_limit):
1163 have_reached_limit = True
1164 continue
showardd1195652009-12-08 22:21:02 +00001165 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001166 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001167 if agent.is_done():
1168 logging.info("agent finished")
1169 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001170 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001171 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001172
1173
showard29f7cd22009-04-29 21:16:24 +00001174 def _process_recurring_runs(self):
1175 recurring_runs = models.RecurringRun.objects.filter(
1176 start_date__lte=datetime.datetime.now())
1177 for rrun in recurring_runs:
1178 # Create job from template
1179 job = rrun.job
1180 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001181 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001182
1183 host_objects = info['hosts']
1184 one_time_hosts = info['one_time_hosts']
1185 metahost_objects = info['meta_hosts']
1186 dependencies = info['dependencies']
1187 atomic_group = info['atomic_group']
1188
1189 for host in one_time_hosts or []:
1190 this_host = models.Host.create_one_time_host(host.hostname)
1191 host_objects.append(this_host)
1192
1193 try:
1194 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001195 options=options,
showard29f7cd22009-04-29 21:16:24 +00001196 host_objects=host_objects,
1197 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001198 atomic_group=atomic_group)
1199
1200 except Exception, ex:
1201 logging.exception(ex)
1202 #TODO send email
1203
1204 if rrun.loop_count == 1:
1205 rrun.delete()
1206 else:
1207 if rrun.loop_count != 0: # if not infinite loop
1208 # calculate new start_date
1209 difference = datetime.timedelta(seconds=rrun.loop_period)
1210 rrun.start_date = rrun.start_date + difference
1211 rrun.loop_count -= 1
1212 rrun.save()
1213
1214
showard170873e2009-01-07 00:22:26 +00001215class PidfileRunMonitor(object):
1216 """
1217 Client must call either run() to start a new process or
1218 attach_to_existing_process().
1219 """
mbligh36768f02008-02-22 18:28:33 +00001220
showard170873e2009-01-07 00:22:26 +00001221 class _PidfileException(Exception):
1222 """
1223 Raised when there's some unexpected behavior with the pid file, but only
1224 used internally (never allowed to escape this class).
1225 """
mbligh36768f02008-02-22 18:28:33 +00001226
1227
showard170873e2009-01-07 00:22:26 +00001228 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001229 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001230 self._start_time = None
1231 self.pidfile_id = None
1232 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001233
1234
showard170873e2009-01-07 00:22:26 +00001235 def _add_nice_command(self, command, nice_level):
1236 if not nice_level:
1237 return command
1238 return ['nice', '-n', str(nice_level)] + command
1239
1240
1241 def _set_start_time(self):
1242 self._start_time = time.time()
1243
1244
showard418785b2009-11-23 20:19:59 +00001245 def run(self, command, working_directory, num_processes, nice_level=None,
1246 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1247 username=None):
showard170873e2009-01-07 00:22:26 +00001248 assert command is not None
1249 if nice_level is not None:
1250 command = ['nice', '-n', str(nice_level)] + command
1251 self._set_start_time()
1252 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001253 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001254 num_processes=num_processes, log_file=log_file,
1255 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001256
1257
showarded2afea2009-07-07 20:54:07 +00001258 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001259 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001260 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001261 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001262 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001263 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001264 if num_processes is not None:
1265 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def kill(self):
showard170873e2009-01-07 00:22:26 +00001269 if self.has_process():
1270 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001271
mbligh36768f02008-02-22 18:28:33 +00001272
showard170873e2009-01-07 00:22:26 +00001273 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001274 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001275 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001276
1277
showard170873e2009-01-07 00:22:26 +00001278 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001279 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001280 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001281 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001282
1283
showard170873e2009-01-07 00:22:26 +00001284 def _read_pidfile(self, use_second_read=False):
1285 assert self.pidfile_id is not None, (
1286 'You must call run() or attach_to_existing_process()')
1287 contents = _drone_manager.get_pidfile_contents(
1288 self.pidfile_id, use_second_read=use_second_read)
1289 if contents.is_invalid():
1290 self._state = drone_manager.PidfileContents()
1291 raise self._PidfileException(contents)
1292 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001293
1294
showard21baa452008-10-21 00:08:39 +00001295 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001296 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1297 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001298 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001299 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001300
1301
1302 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001303 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001304 return
mblighbb421852008-03-11 22:36:16 +00001305
showard21baa452008-10-21 00:08:39 +00001306 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001307
showard170873e2009-01-07 00:22:26 +00001308 if self._state.process is None:
1309 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001310 return
mbligh90a549d2008-03-25 23:52:34 +00001311
showard21baa452008-10-21 00:08:39 +00001312 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001313 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001314 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001315 return
mbligh90a549d2008-03-25 23:52:34 +00001316
showard170873e2009-01-07 00:22:26 +00001317 # pid but no running process - maybe process *just* exited
1318 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001319 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001320 # autoserv exited without writing an exit code
1321 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001322 self._handle_pidfile_error(
1323 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001324
showard21baa452008-10-21 00:08:39 +00001325
1326 def _get_pidfile_info(self):
1327 """\
1328 After completion, self._state will contain:
1329 pid=None, exit_status=None if autoserv has not yet run
1330 pid!=None, exit_status=None if autoserv is running
1331 pid!=None, exit_status!=None if autoserv has completed
1332 """
1333 try:
1334 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001335 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001336 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001337
1338
showard170873e2009-01-07 00:22:26 +00001339 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001340 """\
1341 Called when no pidfile is found or no pid is in the pidfile.
1342 """
showard170873e2009-01-07 00:22:26 +00001343 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001344 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001345 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001346 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001347 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001348
1349
showard35162b02009-03-03 02:17:30 +00001350 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001351 """\
1352 Called when autoserv has exited without writing an exit status,
1353 or we've timed out waiting for autoserv to write a pid to the
1354 pidfile. In either case, we just return failure and the caller
1355 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001356
showard170873e2009-01-07 00:22:26 +00001357 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001358 """
1359 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001360 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001361 self._state.exit_status = 1
1362 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001363
1364
jadmanski0afbb632008-06-06 21:10:57 +00001365 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001366 self._get_pidfile_info()
1367 return self._state.exit_status
1368
1369
1370 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001371 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001372 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001373 if self._state.num_tests_failed is None:
1374 return -1
showard21baa452008-10-21 00:08:39 +00001375 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001376
1377
showardcdaeae82009-08-31 18:32:48 +00001378 def try_copy_results_on_drone(self, **kwargs):
1379 if self.has_process():
1380 # copy results logs into the normal place for job results
1381 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1382
1383
1384 def try_copy_to_results_repository(self, source, **kwargs):
1385 if self.has_process():
1386 _drone_manager.copy_to_results_repository(self.get_process(),
1387 source, **kwargs)
1388
1389
mbligh36768f02008-02-22 18:28:33 +00001390class Agent(object):
showard77182562009-06-10 00:16:05 +00001391 """
showard8cc058f2009-09-08 16:26:33 +00001392 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001393
1394 The following methods are required on all task objects:
1395 poll() - Called periodically to let the task check its status and
1396 update its internal state. If the task succeeded.
1397 is_done() - Returns True if the task is finished.
1398 abort() - Called when an abort has been requested. The task must
1399 set its aborted attribute to True if it actually aborted.
1400
1401 The following attributes are required on all task objects:
1402 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001403 success - bool, True if this task succeeded.
1404 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1405 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001406 """
1407
1408
showard418785b2009-11-23 20:19:59 +00001409 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001410 """
showard8cc058f2009-09-08 16:26:33 +00001411 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001412 """
showard8cc058f2009-09-08 16:26:33 +00001413 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001414
showard77182562009-06-10 00:16:05 +00001415 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001416 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001417
showard8cc058f2009-09-08 16:26:33 +00001418 self.queue_entry_ids = task.queue_entry_ids
1419 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001420
showard8cc058f2009-09-08 16:26:33 +00001421 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001422 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001423
1424
jadmanski0afbb632008-06-06 21:10:57 +00001425 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001426 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001427 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001428 self.task.poll()
1429 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001430 self.finished = True
showardec113162008-05-08 00:52:49 +00001431
1432
jadmanski0afbb632008-06-06 21:10:57 +00001433 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001434 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001435
1436
showardd3dc1992009-04-22 21:01:40 +00001437 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001438 if self.task:
1439 self.task.abort()
1440 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001441 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001442 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001443
showardd3dc1992009-04-22 21:01:40 +00001444
mbligh36768f02008-02-22 18:28:33 +00001445class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001446 class _NullMonitor(object):
1447 pidfile_id = None
1448
1449 def has_process(self):
1450 return True
1451
1452
1453 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001454 """
showardd1195652009-12-08 22:21:02 +00001455 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001456 """
jadmanski0afbb632008-06-06 21:10:57 +00001457 self.done = False
showardd1195652009-12-08 22:21:02 +00001458 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001459 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001460 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001461 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001462 self.queue_entry_ids = []
1463 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001464 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001465
1466
1467 def _set_ids(self, host=None, queue_entries=None):
1468 if queue_entries and queue_entries != [None]:
1469 self.host_ids = [entry.host.id for entry in queue_entries]
1470 self.queue_entry_ids = [entry.id for entry in queue_entries]
1471 else:
1472 assert host
1473 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001474
1475
jadmanski0afbb632008-06-06 21:10:57 +00001476 def poll(self):
showard08a36412009-05-05 01:01:13 +00001477 if not self.started:
1478 self.start()
showardd1195652009-12-08 22:21:02 +00001479 if not self.done:
1480 self.tick()
showard08a36412009-05-05 01:01:13 +00001481
1482
1483 def tick(self):
showardd1195652009-12-08 22:21:02 +00001484 assert self.monitor
1485 exit_code = self.monitor.exit_code()
1486 if exit_code is None:
1487 return
mbligh36768f02008-02-22 18:28:33 +00001488
showardd1195652009-12-08 22:21:02 +00001489 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001490 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001491
1492
jadmanski0afbb632008-06-06 21:10:57 +00001493 def is_done(self):
1494 return self.done
mbligh36768f02008-02-22 18:28:33 +00001495
1496
jadmanski0afbb632008-06-06 21:10:57 +00001497 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001498 if self.done:
showardd1195652009-12-08 22:21:02 +00001499 assert self.started
showard08a36412009-05-05 01:01:13 +00001500 return
showardd1195652009-12-08 22:21:02 +00001501 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001502 self.done = True
1503 self.success = success
1504 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001505
1506
jadmanski0afbb632008-06-06 21:10:57 +00001507 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001508 """
1509 To be overridden.
1510 """
showarded2afea2009-07-07 20:54:07 +00001511 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001512 self.register_necessary_pidfiles()
1513
1514
1515 def _log_file(self):
1516 if not self._log_file_name:
1517 return None
1518 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001519
mbligh36768f02008-02-22 18:28:33 +00001520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001522 log_file = self._log_file()
1523 if self.monitor and log_file:
1524 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001528 """
1529 To be overridden.
1530 """
jadmanski0afbb632008-06-06 21:10:57 +00001531 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001532 logging.info("%s finished with success=%s", type(self).__name__,
1533 self.success)
1534
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001538 if not self.started:
1539 self.prolog()
1540 self.run()
1541
1542 self.started = True
1543
1544
1545 def abort(self):
1546 if self.monitor:
1547 self.monitor.kill()
1548 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001549 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001550 self.cleanup()
1551
1552
showarded2afea2009-07-07 20:54:07 +00001553 def _get_consistent_execution_path(self, execution_entries):
1554 first_execution_path = execution_entries[0].execution_path()
1555 for execution_entry in execution_entries[1:]:
1556 assert execution_entry.execution_path() == first_execution_path, (
1557 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1558 execution_entry,
1559 first_execution_path,
1560 execution_entries[0]))
1561 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001562
1563
showarded2afea2009-07-07 20:54:07 +00001564 def _copy_results(self, execution_entries, use_monitor=None):
1565 """
1566 @param execution_entries: list of objects with execution_path() method
1567 """
showard6d1c1432009-08-20 23:30:39 +00001568 if use_monitor is not None and not use_monitor.has_process():
1569 return
1570
showarded2afea2009-07-07 20:54:07 +00001571 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001572 if use_monitor is None:
1573 assert self.monitor
1574 use_monitor = self.monitor
1575 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001576 execution_path = self._get_consistent_execution_path(execution_entries)
1577 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001578 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001579
showarda1e74b32009-05-12 17:32:04 +00001580
1581 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001582 for queue_entry in queue_entries:
1583 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001584
1585
mbligh4608b002010-01-05 18:22:35 +00001586 def _archive_results(self, queue_entries):
1587 for queue_entry in queue_entries:
1588 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001589
1590
showardd1195652009-12-08 22:21:02 +00001591 def _command_line(self):
1592 """
1593 Return the command line to run. Must be overridden.
1594 """
1595 raise NotImplementedError
1596
1597
1598 @property
1599 def num_processes(self):
1600 """
1601 Return the number of processes forked by this AgentTask's process. It
1602 may only be approximate. To be overridden if necessary.
1603 """
1604 return 1
1605
1606
1607 def _paired_with_monitor(self):
1608 """
1609 If this AgentTask's process must run on the same machine as some
1610 previous process, this method should be overridden to return a
1611 PidfileRunMonitor for that process.
1612 """
1613 return self._NullMonitor()
1614
1615
1616 @property
1617 def owner_username(self):
1618 """
1619 Return login of user responsible for this task. May be None. Must be
1620 overridden.
1621 """
1622 raise NotImplementedError
1623
1624
1625 def _working_directory(self):
1626 """
1627 Return the directory where this AgentTask's process executes. Must be
1628 overridden.
1629 """
1630 raise NotImplementedError
1631
1632
1633 def _pidfile_name(self):
1634 """
1635 Return the name of the pidfile this AgentTask's process uses. To be
1636 overridden if necessary.
1637 """
jamesrenc44ae992010-02-19 00:12:54 +00001638 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001639
1640
1641 def _check_paired_results_exist(self):
1642 if not self._paired_with_monitor().has_process():
1643 email_manager.manager.enqueue_notify_email(
1644 'No paired results in task',
1645 'No paired results in task %s at %s'
1646 % (self, self._paired_with_monitor().pidfile_id))
1647 self.finished(False)
1648 return False
1649 return True
1650
1651
1652 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001653 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001654 self.monitor = PidfileRunMonitor()
1655
1656
1657 def run(self):
1658 if not self._check_paired_results_exist():
1659 return
1660
1661 self._create_monitor()
1662 self.monitor.run(
1663 self._command_line(), self._working_directory(),
1664 num_processes=self.num_processes,
1665 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1666 pidfile_name=self._pidfile_name(),
1667 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1668 username=self.owner_username)
1669
1670
1671 def register_necessary_pidfiles(self):
1672 pidfile_id = _drone_manager.get_pidfile_id_from(
1673 self._working_directory(), self._pidfile_name())
1674 _drone_manager.register_pidfile(pidfile_id)
1675
1676 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1677 if paired_pidfile_id:
1678 _drone_manager.register_pidfile(paired_pidfile_id)
1679
1680
1681 def recover(self):
1682 if not self._check_paired_results_exist():
1683 return
1684
1685 self._create_monitor()
1686 self.monitor.attach_to_existing_process(
1687 self._working_directory(), pidfile_name=self._pidfile_name(),
1688 num_processes=self.num_processes)
1689 if not self.monitor.has_process():
1690 # no process to recover; wait to be started normally
1691 self.monitor = None
1692 return
1693
1694 self.started = True
1695 logging.info('Recovering process %s for %s at %s'
1696 % (self.monitor.get_process(), type(self).__name__,
1697 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001698
1699
mbligh4608b002010-01-05 18:22:35 +00001700 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1701 allowed_host_statuses=None):
1702 for entry in queue_entries:
1703 if entry.status not in allowed_hqe_statuses:
1704 raise SchedulerError('Queue task attempting to start '
1705 'entry with invalid status %s: %s'
1706 % (entry.status, entry))
1707 invalid_host_status = (
1708 allowed_host_statuses is not None
1709 and entry.host.status not in allowed_host_statuses)
1710 if invalid_host_status:
1711 raise SchedulerError('Queue task attempting to start on queue '
1712 'entry with invalid host status %s: %s'
1713 % (entry.host.status, entry))
1714
1715
showardd9205182009-04-27 20:09:55 +00001716class TaskWithJobKeyvals(object):
1717 """AgentTask mixin providing functionality to help with job keyval files."""
1718 _KEYVAL_FILE = 'keyval'
1719 def _format_keyval(self, key, value):
1720 return '%s=%s' % (key, value)
1721
1722
1723 def _keyval_path(self):
1724 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001725 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001726
1727
1728 def _write_keyval_after_job(self, field, value):
1729 assert self.monitor
1730 if not self.monitor.has_process():
1731 return
1732 _drone_manager.write_lines_to_file(
1733 self._keyval_path(), [self._format_keyval(field, value)],
1734 paired_with_process=self.monitor.get_process())
1735
1736
1737 def _job_queued_keyval(self, job):
1738 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1739
1740
1741 def _write_job_finished(self):
1742 self._write_keyval_after_job("job_finished", int(time.time()))
1743
1744
showarddb502762009-09-09 15:31:20 +00001745 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1746 keyval_contents = '\n'.join(self._format_keyval(key, value)
1747 for key, value in keyval_dict.iteritems())
1748 # always end with a newline to allow additional keyvals to be written
1749 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001750 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001751 keyval_contents,
1752 file_path=keyval_path)
1753
1754
1755 def _write_keyvals_before_job(self, keyval_dict):
1756 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1757
1758
1759 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001760 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001761 host.hostname)
1762 platform, all_labels = host.platform_and_labels()
1763 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1764 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1765
1766
showard8cc058f2009-09-08 16:26:33 +00001767class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001768 """
1769 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1770 """
1771
1772 TASK_TYPE = None
1773 host = None
1774 queue_entry = None
1775
showardd1195652009-12-08 22:21:02 +00001776 def __init__(self, task, extra_command_args):
1777 super(SpecialAgentTask, self).__init__()
1778
lmrb7c5d272010-04-16 06:34:04 +00001779 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001780
jamesrenc44ae992010-02-19 00:12:54 +00001781 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001782 self.queue_entry = None
1783 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001784 self.queue_entry = scheduler_models.HostQueueEntry(
1785 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001786
showarded2afea2009-07-07 20:54:07 +00001787 self.task = task
1788 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001789
1790
showard8cc058f2009-09-08 16:26:33 +00001791 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001792 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1793
1794
1795 def _command_line(self):
1796 return _autoserv_command_line(self.host.hostname,
1797 self._extra_command_args,
1798 queue_entry=self.queue_entry)
1799
1800
1801 def _working_directory(self):
1802 return self.task.execution_path()
1803
1804
1805 @property
1806 def owner_username(self):
1807 if self.task.requested_by:
1808 return self.task.requested_by.login
1809 return None
showard8cc058f2009-09-08 16:26:33 +00001810
1811
showarded2afea2009-07-07 20:54:07 +00001812 def prolog(self):
1813 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001814 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001815 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001816
1817
showardde634ee2009-01-30 01:44:24 +00001818 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001819 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001820
showard2fe3f1d2009-07-06 20:19:11 +00001821 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001822 return # don't fail metahost entries, they'll be reassigned
1823
showard2fe3f1d2009-07-06 20:19:11 +00001824 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001825 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001826 return # entry has been aborted
1827
showard2fe3f1d2009-07-06 20:19:11 +00001828 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001829 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001830 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001831 self._write_keyval_after_job(queued_key, queued_time)
1832 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001833
showard8cc058f2009-09-08 16:26:33 +00001834 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001835 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001836 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001837 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001838
showard8cc058f2009-09-08 16:26:33 +00001839 pidfile_id = _drone_manager.get_pidfile_id_from(
1840 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001841 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001842 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001843
1844 if self.queue_entry.job.parse_failed_repair:
1845 self._parse_results([self.queue_entry])
1846 else:
1847 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001848
1849
1850 def cleanup(self):
1851 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001852
1853 # We will consider an aborted task to be "Failed"
1854 self.task.finish(bool(self.success))
1855
showardf85a0b72009-10-07 20:48:45 +00001856 if self.monitor:
1857 if self.monitor.has_process():
1858 self._copy_results([self.task])
1859 if self.monitor.pidfile_id is not None:
1860 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001861
1862
1863class RepairTask(SpecialAgentTask):
1864 TASK_TYPE = models.SpecialTask.Task.REPAIR
1865
1866
showardd1195652009-12-08 22:21:02 +00001867 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001868 """\
1869 queue_entry: queue entry to mark failed if this repair fails.
1870 """
1871 protection = host_protections.Protection.get_string(
1872 task.host.protection)
1873 # normalize the protection name
1874 protection = host_protections.Protection.get_attr_name(protection)
1875
1876 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001877 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001878
1879 # *don't* include the queue entry in IDs -- if the queue entry is
1880 # aborted, we want to leave the repair task running
1881 self._set_ids(host=self.host)
1882
1883
1884 def prolog(self):
1885 super(RepairTask, self).prolog()
1886 logging.info("repair_task starting")
1887 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001888
1889
jadmanski0afbb632008-06-06 21:10:57 +00001890 def epilog(self):
1891 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001892
jadmanski0afbb632008-06-06 21:10:57 +00001893 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001894 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001895 else:
showard8cc058f2009-09-08 16:26:33 +00001896 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001897 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001898 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001899
1900
showarded2afea2009-07-07 20:54:07 +00001901class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001902 def _copy_to_results_repository(self):
1903 if not self.queue_entry or self.queue_entry.meta_host:
1904 return
1905
1906 self.queue_entry.set_execution_subdir()
1907 log_name = os.path.basename(self.task.execution_path())
1908 source = os.path.join(self.task.execution_path(), 'debug',
1909 'autoserv.DEBUG')
1910 destination = os.path.join(
1911 self.queue_entry.execution_path(), log_name)
1912
1913 self.monitor.try_copy_to_results_repository(
1914 source, destination_path=destination)
1915
1916
showard170873e2009-01-07 00:22:26 +00001917 def epilog(self):
1918 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001919
showard775300b2009-09-09 15:30:50 +00001920 if self.success:
1921 return
showard8fe93b52008-11-18 17:53:22 +00001922
showard775300b2009-09-09 15:30:50 +00001923 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001924
showard775300b2009-09-09 15:30:50 +00001925 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001926 # effectively ignore failure for these hosts
1927 self.success = True
showard775300b2009-09-09 15:30:50 +00001928 return
1929
1930 if self.queue_entry:
1931 self.queue_entry.requeue()
1932
1933 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001934 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001935 queue_entry__id=self.queue_entry.id):
1936 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1937 self._fail_queue_entry()
1938 return
1939
showard9bb960b2009-11-19 01:02:11 +00001940 queue_entry = models.HostQueueEntry.objects.get(
1941 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001942 else:
1943 queue_entry = None
1944
1945 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001946 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001947 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001948 queue_entry=queue_entry,
1949 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001950
showard8fe93b52008-11-18 17:53:22 +00001951
1952class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001953 TASK_TYPE = models.SpecialTask.Task.VERIFY
1954
1955
showardd1195652009-12-08 22:21:02 +00001956 def __init__(self, task):
1957 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001958 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001959
1960
jadmanski0afbb632008-06-06 21:10:57 +00001961 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001962 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001963
showardb18134f2009-03-20 20:52:18 +00001964 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001965 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001966 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1967 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001968
showarded2afea2009-07-07 20:54:07 +00001969 # Delete any other queued verifies for this host. One verify will do
1970 # and there's no need to keep records of other requests.
1971 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001972 host__id=self.host.id,
1973 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001974 is_active=False, is_complete=False)
1975 queued_verifies = queued_verifies.exclude(id=self.task.id)
1976 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001977
mbligh36768f02008-02-22 18:28:33 +00001978
jadmanski0afbb632008-06-06 21:10:57 +00001979 def epilog(self):
1980 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001981 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001982 if self.queue_entry:
1983 self.queue_entry.on_pending()
1984 else:
1985 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001986
1987
mbligh4608b002010-01-05 18:22:35 +00001988class CleanupTask(PreJobTask):
1989 # note this can also run post-job, but when it does, it's running standalone
1990 # against the host (not related to the job), so it's not considered a
1991 # PostJobTask
1992
1993 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1994
1995
1996 def __init__(self, task, recover_run_monitor=None):
1997 super(CleanupTask, self).__init__(task, ['--cleanup'])
1998 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1999
2000
2001 def prolog(self):
2002 super(CleanupTask, self).prolog()
2003 logging.info("starting cleanup task for host: %s", self.host.hostname)
2004 self.host.set_status(models.Host.Status.CLEANING)
2005 if self.queue_entry:
2006 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2007
2008
2009 def _finish_epilog(self):
2010 if not self.queue_entry or not self.success:
2011 return
2012
2013 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2014 should_run_verify = (
2015 self.queue_entry.job.run_verify
2016 and self.host.protection != do_not_verify_protection)
2017 if should_run_verify:
2018 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2019 models.SpecialTask.objects.create(
2020 host=models.Host.objects.get(id=self.host.id),
2021 queue_entry=entry,
2022 task=models.SpecialTask.Task.VERIFY)
2023 else:
2024 self.queue_entry.on_pending()
2025
2026
2027 def epilog(self):
2028 super(CleanupTask, self).epilog()
2029
2030 if self.success:
2031 self.host.update_field('dirty', 0)
2032 self.host.set_status(models.Host.Status.READY)
2033
2034 self._finish_epilog()
2035
2036
showarda9545c02009-12-18 22:44:26 +00002037class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2038 """
2039 Common functionality for QueueTask and HostlessQueueTask
2040 """
2041 def __init__(self, queue_entries):
2042 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002043 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002044 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002045
2046
showard73ec0442009-02-07 02:05:20 +00002047 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002048 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002049
2050
jamesrenc44ae992010-02-19 00:12:54 +00002051 def _write_control_file(self, execution_path):
2052 control_path = _drone_manager.attach_file_to_execution(
2053 execution_path, self.job.control_file)
2054 return control_path
2055
2056
showardd1195652009-12-08 22:21:02 +00002057 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002058 execution_path = self.queue_entries[0].execution_path()
2059 control_path = self._write_control_file(execution_path)
2060 hostnames = ','.join(entry.host.hostname
2061 for entry in self.queue_entries
2062 if not entry.is_hostless())
2063
2064 execution_tag = self.queue_entries[0].execution_tag()
2065 params = _autoserv_command_line(
2066 hostnames,
2067 ['-P', execution_tag, '-n',
2068 _drone_manager.absolute_path(control_path)],
2069 job=self.job, verbose=False)
2070
2071 if not self.job.is_server_job():
2072 params.append('-c')
2073
2074 return params
showardd1195652009-12-08 22:21:02 +00002075
2076
2077 @property
2078 def num_processes(self):
2079 return len(self.queue_entries)
2080
2081
2082 @property
2083 def owner_username(self):
2084 return self.job.owner
2085
2086
2087 def _working_directory(self):
2088 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002092 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002093 keyval_dict = self.job.keyval_dict()
2094 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002095 group_name = self.queue_entries[0].get_group_name()
2096 if group_name:
2097 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002098 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002099 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002100 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002101 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002102
2103
showard35162b02009-03-03 02:17:30 +00002104 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002105 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002106 _drone_manager.write_lines_to_file(error_file_path,
2107 [_LOST_PROCESS_ERROR])
2108
2109
showardd3dc1992009-04-22 21:01:40 +00002110 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002111 if not self.monitor:
2112 return
2113
showardd9205182009-04-27 20:09:55 +00002114 self._write_job_finished()
2115
showard35162b02009-03-03 02:17:30 +00002116 if self.monitor.lost_process:
2117 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002118
jadmanskif7fa2cc2008-10-01 14:13:23 +00002119
showardcbd74612008-11-19 21:42:02 +00002120 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002121 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002122 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002123 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002124 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002125
2126
jadmanskif7fa2cc2008-10-01 14:13:23 +00002127 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002128 if not self.monitor or not self.monitor.has_process():
2129 return
2130
jadmanskif7fa2cc2008-10-01 14:13:23 +00002131 # build up sets of all the aborted_by and aborted_on values
2132 aborted_by, aborted_on = set(), set()
2133 for queue_entry in self.queue_entries:
2134 if queue_entry.aborted_by:
2135 aborted_by.add(queue_entry.aborted_by)
2136 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2137 aborted_on.add(t)
2138
2139 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002140 # TODO(showard): this conditional is now obsolete, we just need to leave
2141 # it in temporarily for backwards compatibility over upgrades. delete
2142 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002143 assert len(aborted_by) <= 1
2144 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002145 aborted_by_value = aborted_by.pop()
2146 aborted_on_value = max(aborted_on)
2147 else:
2148 aborted_by_value = 'autotest_system'
2149 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002150
showarda0382352009-02-11 23:36:43 +00002151 self._write_keyval_after_job("aborted_by", aborted_by_value)
2152 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002153
showardcbd74612008-11-19 21:42:02 +00002154 aborted_on_string = str(datetime.datetime.fromtimestamp(
2155 aborted_on_value))
2156 self._write_status_comment('Job aborted by %s on %s' %
2157 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002158
2159
jadmanski0afbb632008-06-06 21:10:57 +00002160 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002161 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002162 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002163 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002164
2165
jadmanski0afbb632008-06-06 21:10:57 +00002166 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002167 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002168 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002169
2170
2171class QueueTask(AbstractQueueTask):
2172 def __init__(self, queue_entries):
2173 super(QueueTask, self).__init__(queue_entries)
2174 self._set_ids(queue_entries=queue_entries)
2175
2176
2177 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002178 self._check_queue_entry_statuses(
2179 self.queue_entries,
2180 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2181 models.HostQueueEntry.Status.RUNNING),
2182 allowed_host_statuses=(models.Host.Status.PENDING,
2183 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002184
2185 super(QueueTask, self).prolog()
2186
2187 for queue_entry in self.queue_entries:
2188 self._write_host_keyvals(queue_entry.host)
2189 queue_entry.host.set_status(models.Host.Status.RUNNING)
2190 queue_entry.host.update_field('dirty', 1)
2191 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2192 # TODO(gps): Remove this if nothing needs it anymore.
2193 # A potential user is: tko/parser
2194 self.job.write_to_machines_file(self.queue_entries[0])
2195
2196
2197 def _finish_task(self):
2198 super(QueueTask, self)._finish_task()
2199
2200 for queue_entry in self.queue_entries:
2201 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002202
2203
mbligh4608b002010-01-05 18:22:35 +00002204class HostlessQueueTask(AbstractQueueTask):
2205 def __init__(self, queue_entry):
2206 super(HostlessQueueTask, self).__init__([queue_entry])
2207 self.queue_entry_ids = [queue_entry.id]
2208
2209
2210 def prolog(self):
2211 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2212 super(HostlessQueueTask, self).prolog()
2213
2214
mbligh4608b002010-01-05 18:22:35 +00002215 def _finish_task(self):
2216 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002217 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002218
2219
showardd3dc1992009-04-22 21:01:40 +00002220class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002221 def __init__(self, queue_entries, log_file_name):
2222 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002223
showardd1195652009-12-08 22:21:02 +00002224 self.queue_entries = queue_entries
2225
showardd3dc1992009-04-22 21:01:40 +00002226 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002227 self._autoserv_monitor.attach_to_existing_process(
2228 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002229
showardd1195652009-12-08 22:21:02 +00002230
2231 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002232 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002233 return 'true'
2234 return self._generate_command(
2235 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002236
2237
2238 def _generate_command(self, results_dir):
2239 raise NotImplementedError('Subclasses must override this')
2240
2241
showardd1195652009-12-08 22:21:02 +00002242 @property
2243 def owner_username(self):
2244 return self.queue_entries[0].job.owner
2245
2246
2247 def _working_directory(self):
2248 return self._get_consistent_execution_path(self.queue_entries)
2249
2250
2251 def _paired_with_monitor(self):
2252 return self._autoserv_monitor
2253
2254
showardd3dc1992009-04-22 21:01:40 +00002255 def _job_was_aborted(self):
2256 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002257 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002258 queue_entry.update_from_database()
2259 if was_aborted is None: # first queue entry
2260 was_aborted = bool(queue_entry.aborted)
2261 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2262 email_manager.manager.enqueue_notify_email(
2263 'Inconsistent abort state',
2264 'Queue entries have inconsistent abort state: ' +
2265 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2266 # don't crash here, just assume true
2267 return True
2268 return was_aborted
2269
2270
showardd1195652009-12-08 22:21:02 +00002271 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002272 if self._job_was_aborted():
2273 return models.HostQueueEntry.Status.ABORTED
2274
2275 # we'll use a PidfileRunMonitor to read the autoserv exit status
2276 if self._autoserv_monitor.exit_code() == 0:
2277 return models.HostQueueEntry.Status.COMPLETED
2278 return models.HostQueueEntry.Status.FAILED
2279
2280
showardd3dc1992009-04-22 21:01:40 +00002281 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002282 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002283 queue_entry.set_status(status)
2284
2285
2286 def abort(self):
2287 # override AgentTask.abort() to avoid killing the process and ending
2288 # the task. post-job tasks continue when the job is aborted.
2289 pass
2290
2291
mbligh4608b002010-01-05 18:22:35 +00002292 def _pidfile_label(self):
2293 # '.autoserv_execute' -> 'autoserv'
2294 return self._pidfile_name()[1:-len('_execute')]
2295
2296
showard9bb960b2009-11-19 01:02:11 +00002297class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002298 """
2299 Task responsible for
2300 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2301 * copying logs to the results repository
2302 * spawning CleanupTasks for hosts, if necessary
2303 * spawning a FinalReparseTask for the job
2304 """
showardd1195652009-12-08 22:21:02 +00002305 def __init__(self, queue_entries, recover_run_monitor=None):
2306 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002307 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002308 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002309 self._set_ids(queue_entries=queue_entries)
2310
2311
2312 def _generate_command(self, results_dir):
2313 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002314 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002315 return [_autoserv_path , '-p',
2316 '--pidfile-label=%s' % self._pidfile_label(),
2317 '--use-existing-results', '--collect-crashinfo',
2318 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002319
2320
showardd1195652009-12-08 22:21:02 +00002321 @property
2322 def num_processes(self):
2323 return len(self.queue_entries)
2324
2325
2326 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002327 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002328
2329
showardd3dc1992009-04-22 21:01:40 +00002330 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002331 self._check_queue_entry_statuses(
2332 self.queue_entries,
2333 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2334 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002335
showardd3dc1992009-04-22 21:01:40 +00002336 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002337
2338
showardd3dc1992009-04-22 21:01:40 +00002339 def epilog(self):
2340 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002341 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002342 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002343
showard9bb960b2009-11-19 01:02:11 +00002344
2345 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002346 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002347 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002348 models.HostQueueEntry.Status.COMPLETED)
2349 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2350 else:
2351 final_success = False
2352 num_tests_failed = 0
2353
showard9bb960b2009-11-19 01:02:11 +00002354 reboot_after = self._job.reboot_after
2355 do_reboot = (
2356 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002357 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002358 or reboot_after == model_attributes.RebootAfter.ALWAYS
2359 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002360 and final_success and num_tests_failed == 0))
2361
showardd1195652009-12-08 22:21:02 +00002362 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002363 if do_reboot:
2364 # don't pass the queue entry to the CleanupTask. if the cleanup
2365 # fails, the job doesn't care -- it's over.
2366 models.SpecialTask.objects.create(
2367 host=models.Host.objects.get(id=queue_entry.host.id),
2368 task=models.SpecialTask.Task.CLEANUP,
2369 requested_by=self._job.owner_model())
2370 else:
2371 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002372
2373
showard0bbfc212009-04-29 21:06:13 +00002374 def run(self):
showard597bfd32009-05-08 18:22:50 +00002375 autoserv_exit_code = self._autoserv_monitor.exit_code()
2376 # only run if Autoserv exited due to some signal. if we have no exit
2377 # code, assume something bad (and signal-like) happened.
2378 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002379 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002380 else:
2381 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002382
2383
mbligh4608b002010-01-05 18:22:35 +00002384class SelfThrottledPostJobTask(PostJobTask):
2385 """
2386 Special AgentTask subclass that maintains its own global process limit.
2387 """
2388 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002389
2390
mbligh4608b002010-01-05 18:22:35 +00002391 @classmethod
2392 def _increment_running_processes(cls):
2393 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002394
mblighd5c95802008-03-05 00:33:46 +00002395
mbligh4608b002010-01-05 18:22:35 +00002396 @classmethod
2397 def _decrement_running_processes(cls):
2398 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002399
2400
mbligh4608b002010-01-05 18:22:35 +00002401 @classmethod
2402 def _max_processes(cls):
2403 raise NotImplementedError
2404
2405
2406 @classmethod
2407 def _can_run_new_process(cls):
2408 return cls._num_running_processes < cls._max_processes()
2409
2410
2411 def _process_started(self):
2412 return bool(self.monitor)
2413
2414
2415 def tick(self):
2416 # override tick to keep trying to start until the process count goes
2417 # down and we can, at which point we revert to default behavior
2418 if self._process_started():
2419 super(SelfThrottledPostJobTask, self).tick()
2420 else:
2421 self._try_starting_process()
2422
2423
2424 def run(self):
2425 # override run() to not actually run unless we can
2426 self._try_starting_process()
2427
2428
2429 def _try_starting_process(self):
2430 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002431 return
2432
mbligh4608b002010-01-05 18:22:35 +00002433 # actually run the command
2434 super(SelfThrottledPostJobTask, self).run()
2435 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002436
mblighd5c95802008-03-05 00:33:46 +00002437
mbligh4608b002010-01-05 18:22:35 +00002438 def finished(self, success):
2439 super(SelfThrottledPostJobTask, self).finished(success)
2440 if self._process_started():
2441 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002442
showard21baa452008-10-21 00:08:39 +00002443
mbligh4608b002010-01-05 18:22:35 +00002444class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002445 def __init__(self, queue_entries):
2446 super(FinalReparseTask, self).__init__(queue_entries,
2447 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002448 # don't use _set_ids, since we don't want to set the host_ids
2449 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002450
2451
2452 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002453 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002454 results_dir]
2455
2456
2457 @property
2458 def num_processes(self):
2459 return 0 # don't include parser processes in accounting
2460
2461
2462 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002463 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002464
2465
showard97aed502008-11-04 02:01:24 +00002466 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002467 def _max_processes(cls):
2468 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002469
2470
2471 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002472 self._check_queue_entry_statuses(
2473 self.queue_entries,
2474 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002475
showard97aed502008-11-04 02:01:24 +00002476 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002477
2478
2479 def epilog(self):
2480 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002481 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002482
2483
mbligh4608b002010-01-05 18:22:35 +00002484class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002485 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2486
mbligh4608b002010-01-05 18:22:35 +00002487 def __init__(self, queue_entries):
2488 super(ArchiveResultsTask, self).__init__(queue_entries,
2489 log_file_name='.archiving.log')
2490 # don't use _set_ids, since we don't want to set the host_ids
2491 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002492
2493
mbligh4608b002010-01-05 18:22:35 +00002494 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002495 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002496
2497
mbligh4608b002010-01-05 18:22:35 +00002498 def _generate_command(self, results_dir):
2499 return [_autoserv_path , '-p',
2500 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002501 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002502 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2503 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002504
2505
mbligh4608b002010-01-05 18:22:35 +00002506 @classmethod
2507 def _max_processes(cls):
2508 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002509
2510
2511 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002512 self._check_queue_entry_statuses(
2513 self.queue_entries,
2514 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2515
2516 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002517
2518
mbligh4608b002010-01-05 18:22:35 +00002519 def epilog(self):
2520 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002521 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002522 failed_file = os.path.join(self._working_directory(),
2523 self._ARCHIVING_FAILED_FILE)
2524 paired_process = self._paired_with_monitor().get_process()
2525 _drone_manager.write_lines_to_file(
2526 failed_file, ['Archiving failed with exit code %s'
2527 % self.monitor.exit_code()],
2528 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002529 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002530
2531
mbligh36768f02008-02-22 18:28:33 +00002532if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002533 main()