blob: 2f4f0d54e2134e3ff1f00046ccb5c2c5b0df03fa [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +000010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000026from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesren883492a2010-02-12 00:45:18 +000028from autotest_lib.scheduler import gc_stats, metahost_scheduler
jamesrenc44ae992010-02-19 00:12:54 +000029from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000030BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
31PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000032
mbligh36768f02008-02-22 18:28:33 +000033RESULTS_DIR = '.'
34AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000035DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000036AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
37
38if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000039 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000040AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
41AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
42
43if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000044 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000045
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000058_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000059
60
showardec6a3b92009-09-25 20:29:13 +000061def _get_pidfile_timeout_secs():
62 """@returns How long to wait for autoserv to write pidfile."""
63 pidfile_timeout_mins = global_config.global_config.get_config_value(
64 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
65 return pidfile_timeout_mins * 60
66
67
mbligh83c1e9e2009-05-01 23:10:41 +000068def _site_init_monitor_db_dummy():
69 return {}
70
71
jamesrenc44ae992010-02-19 00:12:54 +000072get_site_metahost_schedulers = utils.import_site_function(
jamesren883492a2010-02-12 00:45:18 +000073 __file__, 'autotest_lib.scheduler.site_metahost_scheduler',
jamesrenc44ae992010-02-19 00:12:54 +000074 'get_metahost_schedulers', lambda : ())
jamesren883492a2010-02-12 00:45:18 +000075
76
mbligh36768f02008-02-22 18:28:33 +000077def main():
showard27f33872009-04-07 18:20:53 +000078 try:
showard549afad2009-08-20 23:33:36 +000079 try:
80 main_without_exception_handling()
81 except SystemExit:
82 raise
83 except:
84 logging.exception('Exception escaping in monitor_db')
85 raise
86 finally:
87 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000088
89
90def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000091 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000092
showard136e6dc2009-06-10 19:38:49 +000093 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000094 parser = optparse.OptionParser(usage)
95 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
96 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000097 parser.add_option('--test', help='Indicate that scheduler is under ' +
98 'test and should use dummy autoserv and no parsing',
99 action='store_true')
100 (options, args) = parser.parse_args()
101 if len(args) != 1:
102 parser.print_usage()
103 return
mbligh36768f02008-02-22 18:28:33 +0000104
showard5613c662009-06-08 23:30:33 +0000105 scheduler_enabled = global_config.global_config.get_config_value(
106 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
107
108 if not scheduler_enabled:
109 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
110 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000111 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000112 sys.exit(1)
113
jadmanski0afbb632008-06-06 21:10:57 +0000114 global RESULTS_DIR
115 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000116
mbligh83c1e9e2009-05-01 23:10:41 +0000117 site_init = utils.import_site_function(__file__,
118 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
119 _site_init_monitor_db_dummy)
120 site_init()
121
showardcca334f2009-03-12 20:38:34 +0000122 # Change the cwd while running to avoid issues incase we were launched from
123 # somewhere odd (such as a random NFS home directory of the person running
124 # sudo to launch us as the appropriate user).
125 os.chdir(RESULTS_DIR)
126
showardc85c21b2008-11-24 22:17:37 +0000127
jadmanski0afbb632008-06-06 21:10:57 +0000128 if options.test:
129 global _autoserv_path
130 _autoserv_path = 'autoserv_dummy'
131 global _testing_mode
132 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000133
jamesrenc44ae992010-02-19 00:12:54 +0000134 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000135 server.start()
136
jadmanski0afbb632008-06-06 21:10:57 +0000137 try:
jamesrenc44ae992010-02-19 00:12:54 +0000138 initialize()
showardc5afc462009-01-13 00:09:39 +0000139 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000140 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000141
jadmanski0afbb632008-06-06 21:10:57 +0000142 while not _shutdown:
143 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000144 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000145 except:
showard170873e2009-01-07 00:22:26 +0000146 email_manager.manager.log_stacktrace(
147 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000148
showard170873e2009-01-07 00:22:26 +0000149 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000150 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000151 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000152 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000153
154
showard136e6dc2009-06-10 19:38:49 +0000155def setup_logging():
156 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
157 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
158 logging_manager.configure_logging(
159 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
160 logfile_name=log_name)
161
162
mbligh36768f02008-02-22 18:28:33 +0000163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
jamesrenc44ae992010-02-19 00:12:54 +0000169def initialize():
showardb18134f2009-03-20 20:52:18 +0000170 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
171 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000172
showard8de37132009-08-31 18:33:08 +0000173 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000174 logging.critical("monitor_db already running, aborting!")
175 sys.exit(1)
176 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000177
showardb1e51872008-10-07 11:08:18 +0000178 if _testing_mode:
179 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000180 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000181
jadmanski0afbb632008-06-06 21:10:57 +0000182 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
183 global _db
showard170873e2009-01-07 00:22:26 +0000184 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000185 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000186
showardfa8629c2008-11-04 16:51:23 +0000187 # ensure Django connection is in autocommit
188 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000189 # bypass the readonly connection
190 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000191
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000193 signal.signal(signal.SIGINT, handle_sigint)
194
jamesrenc44ae992010-02-19 00:12:54 +0000195 initialize_globals()
196 scheduler_models.initialize()
197
showardd1ee1dd2009-01-07 21:33:08 +0000198 drones = global_config.global_config.get_config_value(
199 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
200 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000201 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000202 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000203 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
204
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000206
207
jamesrenc44ae992010-02-19 00:12:54 +0000208def initialize_globals():
209 global _drone_manager
210 _drone_manager = drone_manager.instance()
211
212
showarded2afea2009-07-07 20:54:07 +0000213def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
214 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000215 """
216 @returns The autoserv command line as a list of executable + parameters.
217
218 @param machines - string - A machine or comma separated list of machines
219 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000220 @param extra_args - list - Additional arguments to pass to autoserv.
221 @param job - Job object - If supplied, -u owner and -l name parameters
222 will be added.
223 @param queue_entry - A HostQueueEntry object - If supplied and no Job
224 object was supplied, this will be used to lookup the Job object.
225 """
showarda9545c02009-12-18 22:44:26 +0000226 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000227 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000228 if machines:
229 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000230 if job or queue_entry:
231 if not job:
232 job = queue_entry.job
233 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000234 if verbose:
235 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000236 return autoserv_argv + extra_args
237
238
showard89f84db2009-03-12 20:39:13 +0000239class SchedulerError(Exception):
240 """Raised by HostScheduler when an inconsistent state occurs."""
241
242
jamesren883492a2010-02-12 00:45:18 +0000243class HostScheduler(metahost_scheduler.HostSchedulingUtility):
244 """Handles the logic for choosing when to run jobs and on which hosts.
245
246 This class makes several queries to the database on each tick, building up
247 some auxiliary data structures and using them to determine which hosts are
248 eligible to run which jobs, taking into account all the various factors that
249 affect that.
250
251 In the past this was done with one or two very large, complex database
252 queries. It has proven much simpler and faster to build these auxiliary
253 data structures and perform the logic in Python.
254 """
255 def __init__(self):
jamesrenc44ae992010-02-19 00:12:54 +0000256 self._metahost_schedulers = metahost_scheduler.get_metahost_schedulers()
257
258 # load site-specific scheduler selected in global_config
259 site_schedulers_str = global_config.global_config.get_config_value(
260 scheduler_config.CONFIG_SECTION, 'site_metahost_schedulers',
261 default='')
262 site_schedulers = set(site_schedulers_str.split(','))
263 for scheduler in get_site_metahost_schedulers():
264 if type(scheduler).__name__ in site_schedulers:
265 # always prepend, so site schedulers take precedence
266 self._metahost_schedulers = (
267 [scheduler] + self._metahost_schedulers)
268 logging.info('Metahost schedulers: %s',
269 ', '.join(type(scheduler).__name__ for scheduler
270 in self._metahost_schedulers))
jamesren883492a2010-02-12 00:45:18 +0000271
272
showard63a34772008-08-18 19:32:50 +0000273 def _get_ready_hosts(self):
274 # avoid any host with a currently active queue entry against it
jamesrenc44ae992010-02-19 00:12:54 +0000275 hosts = scheduler_models.Host.fetch(
showardeab66ce2009-12-23 00:03:56 +0000276 joins='LEFT JOIN afe_host_queue_entries AS active_hqe '
277 'ON (afe_hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000278 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000279 where="active_hqe.host_id IS NULL "
showardeab66ce2009-12-23 00:03:56 +0000280 "AND NOT afe_hosts.locked "
281 "AND (afe_hosts.status IS NULL "
282 "OR afe_hosts.status = 'Ready')")
showard63a34772008-08-18 19:32:50 +0000283 return dict((host.id, host) for host in hosts)
284
285
286 @staticmethod
287 def _get_sql_id_list(id_list):
288 return ','.join(str(item_id) for item_id in id_list)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000293 if not id_list:
294 return {}
showard63a34772008-08-18 19:32:50 +0000295 query %= cls._get_sql_id_list(id_list)
296 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000297 return cls._process_many2many_dict(rows, flip)
298
299
300 @staticmethod
301 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000302 result = {}
303 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000304 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000305 if flip:
306 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000307 result.setdefault(left_id, set()).add(right_id)
308 return result
309
310
311 @classmethod
312 def _get_job_acl_groups(cls, job_ids):
313 query = """
showardeab66ce2009-12-23 00:03:56 +0000314 SELECT afe_jobs.id, afe_acl_groups_users.aclgroup_id
315 FROM afe_jobs
316 INNER JOIN afe_users ON afe_users.login = afe_jobs.owner
317 INNER JOIN afe_acl_groups_users ON
318 afe_acl_groups_users.user_id = afe_users.id
319 WHERE afe_jobs.id IN (%s)
showard63a34772008-08-18 19:32:50 +0000320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
325 def _get_job_ineligible_hosts(cls, job_ids):
326 query = """
327 SELECT job_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000328 FROM afe_ineligible_host_queues
showard63a34772008-08-18 19:32:50 +0000329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard989f25d2008-10-01 11:38:11 +0000335 def _get_job_dependencies(cls, job_ids):
336 query = """
337 SELECT job_id, label_id
showardeab66ce2009-12-23 00:03:56 +0000338 FROM afe_jobs_dependency_labels
showard989f25d2008-10-01 11:38:11 +0000339 WHERE job_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, job_ids)
342
343
344 @classmethod
showard63a34772008-08-18 19:32:50 +0000345 def _get_host_acls(cls, host_ids):
346 query = """
showardd9ac4452009-02-07 02:04:37 +0000347 SELECT host_id, aclgroup_id
showardeab66ce2009-12-23 00:03:56 +0000348 FROM afe_acl_groups_hosts
showard63a34772008-08-18 19:32:50 +0000349 WHERE host_id IN (%s)
350 """
351 return cls._get_many2many_dict(query, host_ids)
352
353
354 @classmethod
355 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000356 if not host_ids:
357 return {}, {}
showard63a34772008-08-18 19:32:50 +0000358 query = """
359 SELECT label_id, host_id
showardeab66ce2009-12-23 00:03:56 +0000360 FROM afe_hosts_labels
showard63a34772008-08-18 19:32:50 +0000361 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000362 """ % cls._get_sql_id_list(host_ids)
363 rows = _db.execute(query)
364 labels_to_hosts = cls._process_many2many_dict(rows)
365 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
366 return labels_to_hosts, hosts_to_labels
367
368
369 @classmethod
370 def _get_labels(cls):
jamesrenc44ae992010-02-19 00:12:54 +0000371 return dict((label.id, label) for label
372 in scheduler_models.Label.fetch())
373
374
375 def recovery_on_startup(self):
376 for metahost_scheduler in self._metahost_schedulers:
377 metahost_scheduler.recovery_on_startup()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def refresh(self, pending_queue_entries):
381 self._hosts_available = self._get_ready_hosts()
382
383 relevant_jobs = [queue_entry.job_id
384 for queue_entry in pending_queue_entries]
385 self._job_acls = self._get_job_acl_groups(relevant_jobs)
386 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000387 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000388
389 host_ids = self._hosts_available.keys()
390 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000391 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
392
393 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000394
jamesrene21bf412010-02-26 02:30:07 +0000395
396 def tick(self):
jamesrenc44ae992010-02-19 00:12:54 +0000397 for metahost_scheduler in self._metahost_schedulers:
398 metahost_scheduler.tick()
399
showard63a34772008-08-18 19:32:50 +0000400
jamesren883492a2010-02-12 00:45:18 +0000401 def hosts_in_label(self, label_id):
jamesren883492a2010-02-12 00:45:18 +0000402 return set(self._label_hosts.get(label_id, ()))
403
404
405 def remove_host_from_label(self, host_id, label_id):
jamesren883492a2010-02-12 00:45:18 +0000406 self._label_hosts[label_id].remove(host_id)
407
408
409 def pop_host(self, host_id):
jamesren883492a2010-02-12 00:45:18 +0000410 return self._hosts_available.pop(host_id)
411
412
413 def ineligible_hosts_for_entry(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000414 return set(self._ineligible_hosts.get(queue_entry.job_id, ()))
415
416
showard63a34772008-08-18 19:32:50 +0000417 def _is_acl_accessible(self, host_id, queue_entry):
418 job_acls = self._job_acls.get(queue_entry.job_id, set())
419 host_acls = self._host_acls.get(host_id, set())
420 return len(host_acls.intersection(job_acls)) > 0
421
422
showard989f25d2008-10-01 11:38:11 +0000423 def _check_job_dependencies(self, job_dependencies, host_labels):
424 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000425 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000426
427
428 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
429 queue_entry):
showardade14e22009-01-26 22:38:32 +0000430 if not queue_entry.meta_host:
431 # bypass only_if_needed labels when a specific host is selected
432 return True
433
showard989f25d2008-10-01 11:38:11 +0000434 for label_id in host_labels:
435 label = self._labels[label_id]
436 if not label.only_if_needed:
437 # we don't care about non-only_if_needed labels
438 continue
439 if queue_entry.meta_host == label_id:
440 # if the label was requested in a metahost it's OK
441 continue
442 if label_id not in job_dependencies:
443 return False
444 return True
445
446
showard89f84db2009-03-12 20:39:13 +0000447 def _check_atomic_group_labels(self, host_labels, queue_entry):
448 """
449 Determine if the given HostQueueEntry's atomic group settings are okay
450 to schedule on a host with the given labels.
451
showard6157c632009-07-06 20:19:31 +0000452 @param host_labels: A list of label ids that the host has.
453 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000454
455 @returns True if atomic group settings are okay, False otherwise.
456 """
showard6157c632009-07-06 20:19:31 +0000457 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000458 queue_entry.atomic_group_id)
459
460
showard6157c632009-07-06 20:19:31 +0000461 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000462 """
463 Return the atomic group label id for a host with the given set of
464 labels if any, or None otherwise. Raises an exception if more than
465 one atomic group are found in the set of labels.
466
showard6157c632009-07-06 20:19:31 +0000467 @param host_labels: A list of label ids that the host has.
468 @param queue_entry: The HostQueueEntry we're testing. Only used for
469 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000470
471 @returns The id of the atomic group found on a label in host_labels
472 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000473 """
showard6157c632009-07-06 20:19:31 +0000474 atomic_labels = [self._labels[label_id] for label_id in host_labels
475 if self._labels[label_id].atomic_group_id is not None]
476 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000477 if not atomic_ids:
478 return None
479 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000480 logging.error('More than one Atomic Group on HQE "%s" via: %r',
481 queue_entry, atomic_labels)
482 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000483
484
485 def _get_atomic_group_labels(self, atomic_group_id):
486 """
487 Lookup the label ids that an atomic_group is associated with.
488
489 @param atomic_group_id - The id of the AtomicGroup to look up.
490
491 @returns A generator yeilding Label ids for this atomic group.
492 """
493 return (id for id, label in self._labels.iteritems()
494 if label.atomic_group_id == atomic_group_id
495 and not label.invalid)
496
497
showard54c1ea92009-05-20 00:32:58 +0000498 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000499 """
500 @param group_hosts - A sequence of Host ids to test for usability
501 and eligibility against the Job associated with queue_entry.
502 @param queue_entry - The HostQueueEntry that these hosts are being
503 tested for eligibility against.
504
505 @returns A subset of group_hosts Host ids that are eligible for the
506 supplied queue_entry.
507 """
508 return set(host_id for host_id in group_hosts
jamesren883492a2010-02-12 00:45:18 +0000509 if self.is_host_usable(host_id)
510 and self.is_host_eligible_for_job(host_id, queue_entry))
showard89f84db2009-03-12 20:39:13 +0000511
512
jamesren883492a2010-02-12 00:45:18 +0000513 def is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000514 if self._is_host_invalid(host_id):
515 # if an invalid host is scheduled for a job, it's a one-time host
516 # and it therefore bypasses eligibility checks. note this can only
517 # happen for non-metahosts, because invalid hosts have their label
518 # relationships cleared.
519 return True
520
showard989f25d2008-10-01 11:38:11 +0000521 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
522 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000523
showard89f84db2009-03-12 20:39:13 +0000524 return (self._is_acl_accessible(host_id, queue_entry) and
525 self._check_job_dependencies(job_dependencies, host_labels) and
526 self._check_only_if_needed_labels(
527 job_dependencies, host_labels, queue_entry) and
528 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000529
530
showard2924b0a2009-06-18 23:16:15 +0000531 def _is_host_invalid(self, host_id):
532 host_object = self._hosts_available.get(host_id, None)
533 return host_object and host_object.invalid
534
535
showard63a34772008-08-18 19:32:50 +0000536 def _schedule_non_metahost(self, queue_entry):
jamesren883492a2010-02-12 00:45:18 +0000537 if not self.is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000538 return None
539 return self._hosts_available.pop(queue_entry.host_id, None)
540
541
jamesren883492a2010-02-12 00:45:18 +0000542 def is_host_usable(self, host_id):
showard63a34772008-08-18 19:32:50 +0000543 if host_id not in self._hosts_available:
544 # host was already used during this scheduling cycle
545 return False
546 if self._hosts_available[host_id].invalid:
547 # Invalid hosts cannot be used for metahosts. They're included in
548 # the original query because they can be used by non-metahosts.
549 return False
550 return True
551
552
jamesren883492a2010-02-12 00:45:18 +0000553 def schedule_entry(self, queue_entry):
554 if queue_entry.host_id is not None:
showard63a34772008-08-18 19:32:50 +0000555 return self._schedule_non_metahost(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000556
557 for scheduler in self._metahost_schedulers:
558 if scheduler.can_schedule_metahost(queue_entry):
559 scheduler.schedule_metahost(queue_entry, self)
560 return None
561
562 raise SchedulerError('No metahost scheduler to handle %s' % queue_entry)
showard63a34772008-08-18 19:32:50 +0000563
564
showard89f84db2009-03-12 20:39:13 +0000565 def find_eligible_atomic_group(self, queue_entry):
566 """
567 Given an atomic group host queue entry, locate an appropriate group
568 of hosts for the associated job to run on.
569
570 The caller is responsible for creating new HQEs for the additional
571 hosts returned in order to run the actual job on them.
572
573 @returns A list of Host instances in a ready state to satisfy this
574 atomic group scheduling. Hosts will all belong to the same
575 atomic group label as specified by the queue_entry.
576 An empty list will be returned if no suitable atomic
577 group could be found.
578
579 TODO(gps): what is responsible for kicking off any attempted repairs on
580 a group of hosts? not this function, but something needs to. We do
581 not communicate that reason for returning [] outside of here...
582 For now, we'll just be unschedulable if enough hosts within one group
583 enter Repair Failed state.
584 """
585 assert queue_entry.atomic_group_id is not None
586 job = queue_entry.job
587 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000588 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000589 if job.synch_count > atomic_group.max_number_of_machines:
590 # Such a Job and HostQueueEntry should never be possible to
591 # create using the frontend. Regardless, we can't process it.
592 # Abort it immediately and log an error on the scheduler.
593 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000594 logging.error(
595 'Error: job %d synch_count=%d > requested atomic_group %d '
596 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
597 job.id, job.synch_count, atomic_group.id,
598 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000599 return []
jamesren883492a2010-02-12 00:45:18 +0000600 hosts_in_label = self.hosts_in_label(queue_entry.meta_host)
601 ineligible_host_ids = self.ineligible_hosts_for_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000602
603 # Look in each label associated with atomic_group until we find one with
604 # enough hosts to satisfy the job.
605 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
jamesren883492a2010-02-12 00:45:18 +0000606 group_hosts = set(self.hosts_in_label(atomic_label_id))
showard89f84db2009-03-12 20:39:13 +0000607 if queue_entry.meta_host is not None:
608 # If we have a metahost label, only allow its hosts.
609 group_hosts.intersection_update(hosts_in_label)
610 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000611 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000612 group_hosts, queue_entry)
613
614 # Job.synch_count is treated as "minimum synch count" when
615 # scheduling for an atomic group of hosts. The atomic group
616 # number of machines is the maximum to pick out of a single
617 # atomic group label for scheduling at one time.
618 min_hosts = job.synch_count
619 max_hosts = atomic_group.max_number_of_machines
620
showard54c1ea92009-05-20 00:32:58 +0000621 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000622 # Not enough eligible hosts in this atomic group label.
623 continue
624
showard54c1ea92009-05-20 00:32:58 +0000625 eligible_hosts_in_group = [self._hosts_available[id]
626 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000627 # So that they show up in a sane order when viewing the job.
jamesrenc44ae992010-02-19 00:12:54 +0000628 eligible_hosts_in_group.sort(cmp=scheduler_models.Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000629
showard89f84db2009-03-12 20:39:13 +0000630 # Limit ourselves to scheduling the atomic group size.
631 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000632 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000633
634 # Remove the selected hosts from our cached internal state
635 # of available hosts in order to return the Host objects.
636 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000637 for host in eligible_hosts_in_group:
638 hosts_in_label.discard(host.id)
639 self._hosts_available.pop(host.id)
640 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000641 return host_list
642
643 return []
644
645
showard170873e2009-01-07 00:22:26 +0000646class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000647 def __init__(self):
648 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000649 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000650 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000651 user_cleanup_time = scheduler_config.config.clean_interval
652 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
653 _db, user_cleanup_time)
654 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000655 self._host_agents = {}
656 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000657 self._tick_count = 0
658 self._last_garbage_stats_time = time.time()
659 self._seconds_between_garbage_stats = 60 * (
660 global_config.global_config.get_config_value(
661 scheduler_config.CONFIG_SECTION,
662 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000663
mbligh36768f02008-02-22 18:28:33 +0000664
showard915958d2009-04-22 21:00:58 +0000665 def initialize(self, recover_hosts=True):
666 self._periodic_cleanup.initialize()
667 self._24hr_upkeep.initialize()
668
jadmanski0afbb632008-06-06 21:10:57 +0000669 # always recover processes
670 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000671
jadmanski0afbb632008-06-06 21:10:57 +0000672 if recover_hosts:
673 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000674
jamesrenc44ae992010-02-19 00:12:54 +0000675 self._host_scheduler.recovery_on_startup()
676
mbligh36768f02008-02-22 18:28:33 +0000677
jadmanski0afbb632008-06-06 21:10:57 +0000678 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000679 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000680 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000681 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000682 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000683 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000684 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000685 self._schedule_running_host_queue_entries()
686 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000687 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000688 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000689 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000690 _drone_manager.execute_actions()
691 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000692 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000693 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000694
showard97aed502008-11-04 02:01:24 +0000695
mblighf3294cc2009-04-08 21:17:38 +0000696 def _run_cleanup(self):
697 self._periodic_cleanup.run_cleanup_maybe()
698 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000699
mbligh36768f02008-02-22 18:28:33 +0000700
showardf13a9e22009-12-18 22:54:09 +0000701 def _garbage_collection(self):
702 threshold_time = time.time() - self._seconds_between_garbage_stats
703 if threshold_time < self._last_garbage_stats_time:
704 # Don't generate these reports very often.
705 return
706
707 self._last_garbage_stats_time = time.time()
708 # Force a full level 0 collection (because we can, it doesn't hurt
709 # at this interval).
710 gc.collect()
711 logging.info('Logging garbage collector stats on tick %d.',
712 self._tick_count)
713 gc_stats._log_garbage_collector_stats()
714
715
showard170873e2009-01-07 00:22:26 +0000716 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
717 for object_id in object_ids:
718 agent_dict.setdefault(object_id, set()).add(agent)
719
720
721 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
722 for object_id in object_ids:
723 assert object_id in agent_dict
724 agent_dict[object_id].remove(agent)
725
726
showardd1195652009-12-08 22:21:02 +0000727 def add_agent_task(self, agent_task):
728 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000729 self._agents.append(agent)
730 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000731 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
732 self._register_agent_for_ids(self._queue_entry_agents,
733 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000734
showard170873e2009-01-07 00:22:26 +0000735
736 def get_agents_for_entry(self, queue_entry):
737 """
738 Find agents corresponding to the specified queue_entry.
739 """
showardd3dc1992009-04-22 21:01:40 +0000740 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000741
742
743 def host_has_agent(self, host):
744 """
745 Determine if there is currently an Agent present using this host.
746 """
747 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000748
749
jadmanski0afbb632008-06-06 21:10:57 +0000750 def remove_agent(self, agent):
751 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000752 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
753 agent)
754 self._unregister_agent_for_ids(self._queue_entry_agents,
755 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000756
757
showard8cc058f2009-09-08 16:26:33 +0000758 def _host_has_scheduled_special_task(self, host):
759 return bool(models.SpecialTask.objects.filter(host__id=host.id,
760 is_active=False,
761 is_complete=False))
762
763
jadmanski0afbb632008-06-06 21:10:57 +0000764 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000765 agent_tasks = self._create_recovery_agent_tasks()
766 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000767 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000768 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000769 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000770 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000771 self._reverify_remaining_hosts()
772 # reinitialize drones after killing orphaned processes, since they can
773 # leave around files when they die
774 _drone_manager.execute_actions()
775 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000776
showard170873e2009-01-07 00:22:26 +0000777
showardd1195652009-12-08 22:21:02 +0000778 def _create_recovery_agent_tasks(self):
779 return (self._get_queue_entry_agent_tasks()
780 + self._get_special_task_agent_tasks(is_active=True))
781
782
783 def _get_queue_entry_agent_tasks(self):
784 # host queue entry statuses handled directly by AgentTasks (Verifying is
785 # handled through SpecialTasks, so is not listed here)
786 statuses = (models.HostQueueEntry.Status.STARTING,
787 models.HostQueueEntry.Status.RUNNING,
788 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000789 models.HostQueueEntry.Status.PARSING,
790 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000791 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000792 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000793 where='status IN (%s)' % status_list)
794
795 agent_tasks = []
796 used_queue_entries = set()
797 for entry in queue_entries:
798 if self.get_agents_for_entry(entry):
799 # already being handled
800 continue
801 if entry in used_queue_entries:
802 # already picked up by a synchronous job
803 continue
804 agent_task = self._get_agent_task_for_queue_entry(entry)
805 agent_tasks.append(agent_task)
806 used_queue_entries.update(agent_task.queue_entries)
807 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000808
809
showardd1195652009-12-08 22:21:02 +0000810 def _get_special_task_agent_tasks(self, is_active=False):
811 special_tasks = models.SpecialTask.objects.filter(
812 is_active=is_active, is_complete=False)
813 return [self._get_agent_task_for_special_task(task)
814 for task in special_tasks]
815
816
817 def _get_agent_task_for_queue_entry(self, queue_entry):
818 """
819 Construct an AgentTask instance for the given active HostQueueEntry,
820 if one can currently run it.
821 @param queue_entry: a HostQueueEntry
822 @returns an AgentTask to run the queue entry
823 """
824 task_entries = queue_entry.job.get_group_entries(queue_entry)
825 self._check_for_duplicate_host_entries(task_entries)
826
827 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
828 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000829 if queue_entry.is_hostless():
830 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000831 return QueueTask(queue_entries=task_entries)
832 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
833 return GatherLogsTask(queue_entries=task_entries)
834 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
835 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000836 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
837 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000838
839 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
jamesrenc44ae992010-02-19 00:12:54 +0000840 'invalid status %s: %s'
841 % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000842
843
844 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000845 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
846 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000847 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000848 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000849 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000850 if using_host:
showardd1195652009-12-08 22:21:02 +0000851 self._assert_host_has_no_agent(task_entry)
852
853
854 def _assert_host_has_no_agent(self, entry):
855 """
856 @param entry: a HostQueueEntry or a SpecialTask
857 """
858 if self.host_has_agent(entry.host):
859 agent = tuple(self._host_agents.get(entry.host.id))[0]
860 raise SchedulerError(
861 'While scheduling %s, host %s already has a host agent %s'
862 % (entry, entry.host, agent.task))
863
864
865 def _get_agent_task_for_special_task(self, special_task):
866 """
867 Construct an AgentTask class to run the given SpecialTask and add it
868 to this dispatcher.
869 @param special_task: a models.SpecialTask instance
870 @returns an AgentTask to run this SpecialTask
871 """
872 self._assert_host_has_no_agent(special_task)
873
874 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
875 for agent_task_class in special_agent_task_classes:
876 if agent_task_class.TASK_TYPE == special_task.task:
877 return agent_task_class(task=special_task)
878
879 raise SchedulerError('No AgentTask class for task', str(special_task))
880
881
882 def _register_pidfiles(self, agent_tasks):
883 for agent_task in agent_tasks:
884 agent_task.register_necessary_pidfiles()
885
886
887 def _recover_tasks(self, agent_tasks):
888 orphans = _drone_manager.get_orphaned_autoserv_processes()
889
890 for agent_task in agent_tasks:
891 agent_task.recover()
892 if agent_task.monitor and agent_task.monitor.has_process():
893 orphans.discard(agent_task.monitor.get_process())
894 self.add_agent_task(agent_task)
895
896 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000897
898
showard8cc058f2009-09-08 16:26:33 +0000899 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000900 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
901 % status):
showard0db3d432009-10-12 20:29:15 +0000902 if entry.status == status and not self.get_agents_for_entry(entry):
903 # The status can change during iteration, e.g., if job.run()
904 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000905 yield entry
906
907
showard6878e8b2009-07-20 22:37:45 +0000908 def _check_for_remaining_orphan_processes(self, orphans):
909 if not orphans:
910 return
911 subject = 'Unrecovered orphan autoserv processes remain'
912 message = '\n'.join(str(process) for process in orphans)
913 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000914
915 die_on_orphans = global_config.global_config.get_config_value(
916 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
917
918 if die_on_orphans:
919 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000920
showard170873e2009-01-07 00:22:26 +0000921
showard8cc058f2009-09-08 16:26:33 +0000922 def _recover_pending_entries(self):
923 for entry in self._get_unassigned_entries(
924 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000925 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000926 entry.on_pending()
927
928
showardb8900452009-10-12 20:31:01 +0000929 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000930 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000931 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
932 unrecovered_hqes = []
933 for queue_entry in queue_entries:
934 special_tasks = models.SpecialTask.objects.filter(
935 task__in=(models.SpecialTask.Task.CLEANUP,
936 models.SpecialTask.Task.VERIFY),
937 queue_entry__id=queue_entry.id,
938 is_complete=False)
939 if special_tasks.count() == 0:
940 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000941
showardb8900452009-10-12 20:31:01 +0000942 if unrecovered_hqes:
943 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000944 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000945 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000946 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000947
948
showard65db3932009-10-28 19:54:35 +0000949 def _get_prioritized_special_tasks(self):
950 """
951 Returns all queued SpecialTasks prioritized for repair first, then
952 cleanup, then verify.
953 """
954 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
955 is_complete=False,
956 host__locked=False)
957 # exclude hosts with active queue entries unless the SpecialTask is for
958 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000959 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000960 queued_tasks, 'afe_host_queue_entries', 'host_id',
961 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000962 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000963 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000964 where=['(afe_host_queue_entries.id IS NULL OR '
965 'afe_host_queue_entries.id = '
966 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000967
showard65db3932009-10-28 19:54:35 +0000968 # reorder tasks by priority
969 task_priority_order = [models.SpecialTask.Task.REPAIR,
970 models.SpecialTask.Task.CLEANUP,
971 models.SpecialTask.Task.VERIFY]
972 def task_priority_key(task):
973 return task_priority_order.index(task.task)
974 return sorted(queued_tasks, key=task_priority_key)
975
976
showard65db3932009-10-28 19:54:35 +0000977 def _schedule_special_tasks(self):
978 """
979 Execute queued SpecialTasks that are ready to run on idle hosts.
980 """
981 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000982 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000983 continue
showardd1195652009-12-08 22:21:02 +0000984 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000985
986
showard170873e2009-01-07 00:22:26 +0000987 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000988 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000989 # should never happen
showarded2afea2009-07-07 20:54:07 +0000990 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000991 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000992 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000993 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000994 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000995
996
jadmanski0afbb632008-06-06 21:10:57 +0000997 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000998 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000999 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +00001000 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +00001001 if self.host_has_agent(host):
1002 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +00001003 continue
showard8cc058f2009-09-08 16:26:33 +00001004 if self._host_has_scheduled_special_task(host):
1005 # host will have a special task scheduled on the next cycle
1006 continue
showard170873e2009-01-07 00:22:26 +00001007 if print_message:
showardb18134f2009-03-20 20:52:18 +00001008 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +00001009 models.SpecialTask.objects.create(
1010 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00001011 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +00001012
1013
jadmanski0afbb632008-06-06 21:10:57 +00001014 def _recover_hosts(self):
1015 # recover "Repair Failed" hosts
1016 message = 'Reverifying dead host %s'
1017 self._reverify_hosts_where("status = 'Repair Failed'",
1018 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +00001019
1020
showard04c82c52008-05-29 19:38:12 +00001021
showardb95b1bd2008-08-15 18:11:04 +00001022 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +00001023 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +00001024 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +00001025 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +00001026 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +00001027 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +00001028
1029
showard89f84db2009-03-12 20:39:13 +00001030 def _refresh_pending_queue_entries(self):
1031 """
1032 Lookup the pending HostQueueEntries and call our HostScheduler
1033 refresh() method given that list. Return the list.
1034
1035 @returns A list of pending HostQueueEntries sorted in priority order.
1036 """
showard63a34772008-08-18 19:32:50 +00001037 queue_entries = self._get_pending_queue_entries()
1038 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001039 return []
showardb95b1bd2008-08-15 18:11:04 +00001040
showard63a34772008-08-18 19:32:50 +00001041 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001042
showard89f84db2009-03-12 20:39:13 +00001043 return queue_entries
1044
1045
1046 def _schedule_atomic_group(self, queue_entry):
1047 """
1048 Schedule the given queue_entry on an atomic group of hosts.
1049
1050 Returns immediately if there are insufficient available hosts.
1051
1052 Creates new HostQueueEntries based off of queue_entry for the
1053 scheduled hosts and starts them all running.
1054 """
1055 # This is a virtual host queue entry representing an entire
1056 # atomic group, find a group and schedule their hosts.
1057 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1058 queue_entry)
1059 if not group_hosts:
1060 return
showardcbe6f942009-06-17 19:33:49 +00001061
1062 logging.info('Expanding atomic group entry %s with hosts %s',
1063 queue_entry,
1064 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +00001065
showard89f84db2009-03-12 20:39:13 +00001066 for assigned_host in group_hosts[1:]:
1067 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +00001068 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001069 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +00001070 new_hqe.set_host(assigned_host)
1071 self._run_queue_entry(new_hqe)
1072
1073 # The first assigned host uses the original HostQueueEntry
1074 queue_entry.set_host(group_hosts[0])
1075 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +00001076
1077
showarda9545c02009-12-18 22:44:26 +00001078 def _schedule_hostless_job(self, queue_entry):
1079 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +00001080 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +00001081
1082
showard89f84db2009-03-12 20:39:13 +00001083 def _schedule_new_jobs(self):
1084 queue_entries = self._refresh_pending_queue_entries()
1085 if not queue_entries:
1086 return
1087
showard63a34772008-08-18 19:32:50 +00001088 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001089 is_unassigned_atomic_group = (
1090 queue_entry.atomic_group_id is not None
1091 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +00001092
1093 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +00001094 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +00001095 elif is_unassigned_atomic_group:
1096 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001097 else:
jamesren883492a2010-02-12 00:45:18 +00001098 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +00001099 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +00001100 assert assigned_host.id == queue_entry.host_id
1101 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001102
1103
showard8cc058f2009-09-08 16:26:33 +00001104 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001105 for agent_task in self._get_queue_entry_agent_tasks():
1106 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001107
1108
1109 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +00001110 for entry in scheduler_models.HostQueueEntry.fetch(
1111 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001112 task = entry.job.schedule_delayed_callback_task(entry)
1113 if task:
showardd1195652009-12-08 22:21:02 +00001114 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001115
1116
jamesren883492a2010-02-12 00:45:18 +00001117 def _run_queue_entry(self, queue_entry):
1118 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +00001119
1120
jadmanski0afbb632008-06-06 21:10:57 +00001121 def _find_aborting(self):
jamesrenc44ae992010-02-19 00:12:54 +00001122 for entry in scheduler_models.HostQueueEntry.fetch(
1123 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001124 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001125 for agent in self.get_agents_for_entry(entry):
1126 agent.abort()
1127 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001128
1129
showard324bf812009-01-20 23:23:38 +00001130 def _can_start_agent(self, agent, num_started_this_cycle,
1131 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001132 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001133 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001134 return True
1135 # don't allow any nonzero-process agents to run after we've reached a
1136 # limit (this avoids starvation of many-process agents)
1137 if have_reached_limit:
1138 return False
1139 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001140 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001141 agent.task.owner_username)
1142 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001143 return False
1144 # if a single agent exceeds the per-cycle throttling, still allow it to
1145 # run when it's the first agent in the cycle
1146 if num_started_this_cycle == 0:
1147 return True
1148 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001149 if (num_started_this_cycle + agent.task.num_processes >
1150 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001151 return False
1152 return True
1153
1154
jadmanski0afbb632008-06-06 21:10:57 +00001155 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001156 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001157 have_reached_limit = False
1158 # iterate over copy, so we can remove agents during iteration
1159 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001160 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001161 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001162 have_reached_limit):
1163 have_reached_limit = True
1164 continue
showardd1195652009-12-08 22:21:02 +00001165 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001166 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001167 if agent.is_done():
1168 logging.info("agent finished")
1169 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001170 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001171 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001172
1173
showard29f7cd22009-04-29 21:16:24 +00001174 def _process_recurring_runs(self):
1175 recurring_runs = models.RecurringRun.objects.filter(
1176 start_date__lte=datetime.datetime.now())
1177 for rrun in recurring_runs:
1178 # Create job from template
1179 job = rrun.job
1180 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001181 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001182
1183 host_objects = info['hosts']
1184 one_time_hosts = info['one_time_hosts']
1185 metahost_objects = info['meta_hosts']
1186 dependencies = info['dependencies']
1187 atomic_group = info['atomic_group']
1188
1189 for host in one_time_hosts or []:
1190 this_host = models.Host.create_one_time_host(host.hostname)
1191 host_objects.append(this_host)
1192
1193 try:
1194 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001195 options=options,
showard29f7cd22009-04-29 21:16:24 +00001196 host_objects=host_objects,
1197 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001198 atomic_group=atomic_group)
1199
1200 except Exception, ex:
1201 logging.exception(ex)
1202 #TODO send email
1203
1204 if rrun.loop_count == 1:
1205 rrun.delete()
1206 else:
1207 if rrun.loop_count != 0: # if not infinite loop
1208 # calculate new start_date
1209 difference = datetime.timedelta(seconds=rrun.loop_period)
1210 rrun.start_date = rrun.start_date + difference
1211 rrun.loop_count -= 1
1212 rrun.save()
1213
1214
showard170873e2009-01-07 00:22:26 +00001215class PidfileRunMonitor(object):
1216 """
1217 Client must call either run() to start a new process or
1218 attach_to_existing_process().
1219 """
mbligh36768f02008-02-22 18:28:33 +00001220
showard170873e2009-01-07 00:22:26 +00001221 class _PidfileException(Exception):
1222 """
1223 Raised when there's some unexpected behavior with the pid file, but only
1224 used internally (never allowed to escape this class).
1225 """
mbligh36768f02008-02-22 18:28:33 +00001226
1227
showard170873e2009-01-07 00:22:26 +00001228 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001229 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001230 self._start_time = None
1231 self.pidfile_id = None
1232 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001233
1234
showard170873e2009-01-07 00:22:26 +00001235 def _add_nice_command(self, command, nice_level):
1236 if not nice_level:
1237 return command
1238 return ['nice', '-n', str(nice_level)] + command
1239
1240
1241 def _set_start_time(self):
1242 self._start_time = time.time()
1243
1244
showard418785b2009-11-23 20:19:59 +00001245 def run(self, command, working_directory, num_processes, nice_level=None,
1246 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1247 username=None):
showard170873e2009-01-07 00:22:26 +00001248 assert command is not None
1249 if nice_level is not None:
1250 command = ['nice', '-n', str(nice_level)] + command
1251 self._set_start_time()
1252 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001253 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001254 num_processes=num_processes, log_file=log_file,
1255 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001256
1257
showarded2afea2009-07-07 20:54:07 +00001258 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +00001259 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +00001260 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001261 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001262 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001263 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001264 if num_processes is not None:
1265 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001266
1267
jadmanski0afbb632008-06-06 21:10:57 +00001268 def kill(self):
showard170873e2009-01-07 00:22:26 +00001269 if self.has_process():
1270 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001271
mbligh36768f02008-02-22 18:28:33 +00001272
showard170873e2009-01-07 00:22:26 +00001273 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001274 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001275 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001276
1277
showard170873e2009-01-07 00:22:26 +00001278 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001279 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001280 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001281 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001282
1283
showard170873e2009-01-07 00:22:26 +00001284 def _read_pidfile(self, use_second_read=False):
1285 assert self.pidfile_id is not None, (
1286 'You must call run() or attach_to_existing_process()')
1287 contents = _drone_manager.get_pidfile_contents(
1288 self.pidfile_id, use_second_read=use_second_read)
1289 if contents.is_invalid():
1290 self._state = drone_manager.PidfileContents()
1291 raise self._PidfileException(contents)
1292 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001293
1294
showard21baa452008-10-21 00:08:39 +00001295 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001296 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1297 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001298 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001299 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001300
1301
1302 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001303 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001304 return
mblighbb421852008-03-11 22:36:16 +00001305
showard21baa452008-10-21 00:08:39 +00001306 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001307
showard170873e2009-01-07 00:22:26 +00001308 if self._state.process is None:
1309 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001310 return
mbligh90a549d2008-03-25 23:52:34 +00001311
showard21baa452008-10-21 00:08:39 +00001312 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001313 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001314 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001315 return
mbligh90a549d2008-03-25 23:52:34 +00001316
showard170873e2009-01-07 00:22:26 +00001317 # pid but no running process - maybe process *just* exited
1318 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001319 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001320 # autoserv exited without writing an exit code
1321 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001322 self._handle_pidfile_error(
1323 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001324
showard21baa452008-10-21 00:08:39 +00001325
1326 def _get_pidfile_info(self):
1327 """\
1328 After completion, self._state will contain:
1329 pid=None, exit_status=None if autoserv has not yet run
1330 pid!=None, exit_status=None if autoserv is running
1331 pid!=None, exit_status!=None if autoserv has completed
1332 """
1333 try:
1334 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001335 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001336 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001337
1338
showard170873e2009-01-07 00:22:26 +00001339 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001340 """\
1341 Called when no pidfile is found or no pid is in the pidfile.
1342 """
showard170873e2009-01-07 00:22:26 +00001343 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001344 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001345 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001346 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001347 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001348
1349
showard35162b02009-03-03 02:17:30 +00001350 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001351 """\
1352 Called when autoserv has exited without writing an exit status,
1353 or we've timed out waiting for autoserv to write a pid to the
1354 pidfile. In either case, we just return failure and the caller
1355 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001356
showard170873e2009-01-07 00:22:26 +00001357 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001358 """
1359 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001360 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001361 self._state.exit_status = 1
1362 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001363
1364
jadmanski0afbb632008-06-06 21:10:57 +00001365 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001366 self._get_pidfile_info()
1367 return self._state.exit_status
1368
1369
1370 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001371 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001372 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001373 if self._state.num_tests_failed is None:
1374 return -1
showard21baa452008-10-21 00:08:39 +00001375 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001376
1377
showardcdaeae82009-08-31 18:32:48 +00001378 def try_copy_results_on_drone(self, **kwargs):
1379 if self.has_process():
1380 # copy results logs into the normal place for job results
1381 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1382
1383
1384 def try_copy_to_results_repository(self, source, **kwargs):
1385 if self.has_process():
1386 _drone_manager.copy_to_results_repository(self.get_process(),
1387 source, **kwargs)
1388
1389
mbligh36768f02008-02-22 18:28:33 +00001390class Agent(object):
showard77182562009-06-10 00:16:05 +00001391 """
showard8cc058f2009-09-08 16:26:33 +00001392 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001393
1394 The following methods are required on all task objects:
1395 poll() - Called periodically to let the task check its status and
1396 update its internal state. If the task succeeded.
1397 is_done() - Returns True if the task is finished.
1398 abort() - Called when an abort has been requested. The task must
1399 set its aborted attribute to True if it actually aborted.
1400
1401 The following attributes are required on all task objects:
1402 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001403 success - bool, True if this task succeeded.
1404 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1405 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001406 """
1407
1408
showard418785b2009-11-23 20:19:59 +00001409 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001410 """
showard8cc058f2009-09-08 16:26:33 +00001411 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001412 """
showard8cc058f2009-09-08 16:26:33 +00001413 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001414
showard77182562009-06-10 00:16:05 +00001415 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001416 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001417
showard8cc058f2009-09-08 16:26:33 +00001418 self.queue_entry_ids = task.queue_entry_ids
1419 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001420
showard8cc058f2009-09-08 16:26:33 +00001421 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001422 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001423
1424
jadmanski0afbb632008-06-06 21:10:57 +00001425 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001426 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001427 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001428 self.task.poll()
1429 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001430 self.finished = True
showardec113162008-05-08 00:52:49 +00001431
1432
jadmanski0afbb632008-06-06 21:10:57 +00001433 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001434 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001435
1436
showardd3dc1992009-04-22 21:01:40 +00001437 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001438 if self.task:
1439 self.task.abort()
1440 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001441 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001442 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001443
showardd3dc1992009-04-22 21:01:40 +00001444
mbligh36768f02008-02-22 18:28:33 +00001445class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001446 class _NullMonitor(object):
1447 pidfile_id = None
1448
1449 def has_process(self):
1450 return True
1451
1452
1453 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001454 """
showardd1195652009-12-08 22:21:02 +00001455 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001456 """
jadmanski0afbb632008-06-06 21:10:57 +00001457 self.done = False
showardd1195652009-12-08 22:21:02 +00001458 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001459 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001460 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001461 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001462 self.queue_entry_ids = []
1463 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001464 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001465
1466
1467 def _set_ids(self, host=None, queue_entries=None):
1468 if queue_entries and queue_entries != [None]:
1469 self.host_ids = [entry.host.id for entry in queue_entries]
1470 self.queue_entry_ids = [entry.id for entry in queue_entries]
1471 else:
1472 assert host
1473 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001474
1475
jadmanski0afbb632008-06-06 21:10:57 +00001476 def poll(self):
showard08a36412009-05-05 01:01:13 +00001477 if not self.started:
1478 self.start()
showardd1195652009-12-08 22:21:02 +00001479 if not self.done:
1480 self.tick()
showard08a36412009-05-05 01:01:13 +00001481
1482
1483 def tick(self):
showardd1195652009-12-08 22:21:02 +00001484 assert self.monitor
1485 exit_code = self.monitor.exit_code()
1486 if exit_code is None:
1487 return
mbligh36768f02008-02-22 18:28:33 +00001488
showardd1195652009-12-08 22:21:02 +00001489 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001490 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001491
1492
jadmanski0afbb632008-06-06 21:10:57 +00001493 def is_done(self):
1494 return self.done
mbligh36768f02008-02-22 18:28:33 +00001495
1496
jadmanski0afbb632008-06-06 21:10:57 +00001497 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001498 if self.done:
showardd1195652009-12-08 22:21:02 +00001499 assert self.started
showard08a36412009-05-05 01:01:13 +00001500 return
showardd1195652009-12-08 22:21:02 +00001501 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001502 self.done = True
1503 self.success = success
1504 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001505
1506
jadmanski0afbb632008-06-06 21:10:57 +00001507 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001508 """
1509 To be overridden.
1510 """
showarded2afea2009-07-07 20:54:07 +00001511 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001512 self.register_necessary_pidfiles()
1513
1514
1515 def _log_file(self):
1516 if not self._log_file_name:
1517 return None
1518 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001519
mbligh36768f02008-02-22 18:28:33 +00001520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001522 log_file = self._log_file()
1523 if self.monitor and log_file:
1524 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001525
1526
jadmanski0afbb632008-06-06 21:10:57 +00001527 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001528 """
1529 To be overridden.
1530 """
jadmanski0afbb632008-06-06 21:10:57 +00001531 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001532 logging.info("%s finished with success=%s", type(self).__name__,
1533 self.success)
1534
mbligh36768f02008-02-22 18:28:33 +00001535
1536
jadmanski0afbb632008-06-06 21:10:57 +00001537 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001538 if not self.started:
1539 self.prolog()
1540 self.run()
1541
1542 self.started = True
1543
1544
1545 def abort(self):
1546 if self.monitor:
1547 self.monitor.kill()
1548 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001549 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001550 self.cleanup()
1551
1552
showarded2afea2009-07-07 20:54:07 +00001553 def _get_consistent_execution_path(self, execution_entries):
1554 first_execution_path = execution_entries[0].execution_path()
1555 for execution_entry in execution_entries[1:]:
1556 assert execution_entry.execution_path() == first_execution_path, (
1557 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1558 execution_entry,
1559 first_execution_path,
1560 execution_entries[0]))
1561 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001562
1563
showarded2afea2009-07-07 20:54:07 +00001564 def _copy_results(self, execution_entries, use_monitor=None):
1565 """
1566 @param execution_entries: list of objects with execution_path() method
1567 """
showard6d1c1432009-08-20 23:30:39 +00001568 if use_monitor is not None and not use_monitor.has_process():
1569 return
1570
showarded2afea2009-07-07 20:54:07 +00001571 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001572 if use_monitor is None:
1573 assert self.monitor
1574 use_monitor = self.monitor
1575 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001576 execution_path = self._get_consistent_execution_path(execution_entries)
1577 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001578 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001579
showarda1e74b32009-05-12 17:32:04 +00001580
1581 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001582 for queue_entry in queue_entries:
1583 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001584
1585
mbligh4608b002010-01-05 18:22:35 +00001586 def _archive_results(self, queue_entries):
1587 for queue_entry in queue_entries:
1588 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001589
1590
showardd1195652009-12-08 22:21:02 +00001591 def _command_line(self):
1592 """
1593 Return the command line to run. Must be overridden.
1594 """
1595 raise NotImplementedError
1596
1597
1598 @property
1599 def num_processes(self):
1600 """
1601 Return the number of processes forked by this AgentTask's process. It
1602 may only be approximate. To be overridden if necessary.
1603 """
1604 return 1
1605
1606
1607 def _paired_with_monitor(self):
1608 """
1609 If this AgentTask's process must run on the same machine as some
1610 previous process, this method should be overridden to return a
1611 PidfileRunMonitor for that process.
1612 """
1613 return self._NullMonitor()
1614
1615
1616 @property
1617 def owner_username(self):
1618 """
1619 Return login of user responsible for this task. May be None. Must be
1620 overridden.
1621 """
1622 raise NotImplementedError
1623
1624
1625 def _working_directory(self):
1626 """
1627 Return the directory where this AgentTask's process executes. Must be
1628 overridden.
1629 """
1630 raise NotImplementedError
1631
1632
1633 def _pidfile_name(self):
1634 """
1635 Return the name of the pidfile this AgentTask's process uses. To be
1636 overridden if necessary.
1637 """
jamesrenc44ae992010-02-19 00:12:54 +00001638 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001639
1640
1641 def _check_paired_results_exist(self):
1642 if not self._paired_with_monitor().has_process():
1643 email_manager.manager.enqueue_notify_email(
1644 'No paired results in task',
1645 'No paired results in task %s at %s'
1646 % (self, self._paired_with_monitor().pidfile_id))
1647 self.finished(False)
1648 return False
1649 return True
1650
1651
1652 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001653 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001654 self.monitor = PidfileRunMonitor()
1655
1656
1657 def run(self):
1658 if not self._check_paired_results_exist():
1659 return
1660
1661 self._create_monitor()
1662 self.monitor.run(
1663 self._command_line(), self._working_directory(),
1664 num_processes=self.num_processes,
1665 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1666 pidfile_name=self._pidfile_name(),
1667 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1668 username=self.owner_username)
1669
1670
1671 def register_necessary_pidfiles(self):
1672 pidfile_id = _drone_manager.get_pidfile_id_from(
1673 self._working_directory(), self._pidfile_name())
1674 _drone_manager.register_pidfile(pidfile_id)
1675
1676 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1677 if paired_pidfile_id:
1678 _drone_manager.register_pidfile(paired_pidfile_id)
1679
1680
1681 def recover(self):
1682 if not self._check_paired_results_exist():
1683 return
1684
1685 self._create_monitor()
1686 self.monitor.attach_to_existing_process(
1687 self._working_directory(), pidfile_name=self._pidfile_name(),
1688 num_processes=self.num_processes)
1689 if not self.monitor.has_process():
1690 # no process to recover; wait to be started normally
1691 self.monitor = None
1692 return
1693
1694 self.started = True
1695 logging.info('Recovering process %s for %s at %s'
1696 % (self.monitor.get_process(), type(self).__name__,
1697 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001698
1699
mbligh4608b002010-01-05 18:22:35 +00001700 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1701 allowed_host_statuses=None):
1702 for entry in queue_entries:
1703 if entry.status not in allowed_hqe_statuses:
1704 raise SchedulerError('Queue task attempting to start '
1705 'entry with invalid status %s: %s'
1706 % (entry.status, entry))
1707 invalid_host_status = (
1708 allowed_host_statuses is not None
1709 and entry.host.status not in allowed_host_statuses)
1710 if invalid_host_status:
1711 raise SchedulerError('Queue task attempting to start on queue '
1712 'entry with invalid host status %s: %s'
1713 % (entry.host.status, entry))
1714
1715
showardd9205182009-04-27 20:09:55 +00001716class TaskWithJobKeyvals(object):
1717 """AgentTask mixin providing functionality to help with job keyval files."""
1718 _KEYVAL_FILE = 'keyval'
1719 def _format_keyval(self, key, value):
1720 return '%s=%s' % (key, value)
1721
1722
1723 def _keyval_path(self):
1724 """Subclasses must override this"""
1725 raise NotImplemented
1726
1727
1728 def _write_keyval_after_job(self, field, value):
1729 assert self.monitor
1730 if not self.monitor.has_process():
1731 return
1732 _drone_manager.write_lines_to_file(
1733 self._keyval_path(), [self._format_keyval(field, value)],
1734 paired_with_process=self.monitor.get_process())
1735
1736
1737 def _job_queued_keyval(self, job):
1738 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1739
1740
1741 def _write_job_finished(self):
1742 self._write_keyval_after_job("job_finished", int(time.time()))
1743
1744
showarddb502762009-09-09 15:31:20 +00001745 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1746 keyval_contents = '\n'.join(self._format_keyval(key, value)
1747 for key, value in keyval_dict.iteritems())
1748 # always end with a newline to allow additional keyvals to be written
1749 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001750 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001751 keyval_contents,
1752 file_path=keyval_path)
1753
1754
1755 def _write_keyvals_before_job(self, keyval_dict):
1756 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1757
1758
1759 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001760 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001761 host.hostname)
1762 platform, all_labels = host.platform_and_labels()
1763 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1764 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1765
1766
showard8cc058f2009-09-08 16:26:33 +00001767class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001768 """
1769 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1770 """
1771
1772 TASK_TYPE = None
1773 host = None
1774 queue_entry = None
1775
showardd1195652009-12-08 22:21:02 +00001776 def __init__(self, task, extra_command_args):
1777 super(SpecialAgentTask, self).__init__()
1778
showarded2afea2009-07-07 20:54:07 +00001779 assert (self.TASK_TYPE is not None,
1780 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001781
jamesrenc44ae992010-02-19 00:12:54 +00001782 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001783 self.queue_entry = None
1784 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001785 self.queue_entry = scheduler_models.HostQueueEntry(
1786 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001787
showarded2afea2009-07-07 20:54:07 +00001788 self.task = task
1789 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001790
1791
showard8cc058f2009-09-08 16:26:33 +00001792 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001793 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1794
1795
1796 def _command_line(self):
1797 return _autoserv_command_line(self.host.hostname,
1798 self._extra_command_args,
1799 queue_entry=self.queue_entry)
1800
1801
1802 def _working_directory(self):
1803 return self.task.execution_path()
1804
1805
1806 @property
1807 def owner_username(self):
1808 if self.task.requested_by:
1809 return self.task.requested_by.login
1810 return None
showard8cc058f2009-09-08 16:26:33 +00001811
1812
showarded2afea2009-07-07 20:54:07 +00001813 def prolog(self):
1814 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001815 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001816 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001817
1818
showardde634ee2009-01-30 01:44:24 +00001819 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001820 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001821
showard2fe3f1d2009-07-06 20:19:11 +00001822 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001823 return # don't fail metahost entries, they'll be reassigned
1824
showard2fe3f1d2009-07-06 20:19:11 +00001825 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001826 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001827 return # entry has been aborted
1828
showard2fe3f1d2009-07-06 20:19:11 +00001829 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001830 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001831 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001832 self._write_keyval_after_job(queued_key, queued_time)
1833 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001834
showard8cc058f2009-09-08 16:26:33 +00001835 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001836 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001837 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001838 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001839
showard8cc058f2009-09-08 16:26:33 +00001840 pidfile_id = _drone_manager.get_pidfile_id_from(
1841 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001842 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001843 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001844
1845 if self.queue_entry.job.parse_failed_repair:
1846 self._parse_results([self.queue_entry])
1847 else:
1848 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001849
1850
1851 def cleanup(self):
1852 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001853
1854 # We will consider an aborted task to be "Failed"
1855 self.task.finish(bool(self.success))
1856
showardf85a0b72009-10-07 20:48:45 +00001857 if self.monitor:
1858 if self.monitor.has_process():
1859 self._copy_results([self.task])
1860 if self.monitor.pidfile_id is not None:
1861 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001862
1863
1864class RepairTask(SpecialAgentTask):
1865 TASK_TYPE = models.SpecialTask.Task.REPAIR
1866
1867
showardd1195652009-12-08 22:21:02 +00001868 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001869 """\
1870 queue_entry: queue entry to mark failed if this repair fails.
1871 """
1872 protection = host_protections.Protection.get_string(
1873 task.host.protection)
1874 # normalize the protection name
1875 protection = host_protections.Protection.get_attr_name(protection)
1876
1877 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001878 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001879
1880 # *don't* include the queue entry in IDs -- if the queue entry is
1881 # aborted, we want to leave the repair task running
1882 self._set_ids(host=self.host)
1883
1884
1885 def prolog(self):
1886 super(RepairTask, self).prolog()
1887 logging.info("repair_task starting")
1888 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001889
1890
jadmanski0afbb632008-06-06 21:10:57 +00001891 def epilog(self):
1892 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001893
jadmanski0afbb632008-06-06 21:10:57 +00001894 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001895 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001896 else:
showard8cc058f2009-09-08 16:26:33 +00001897 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001898 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001899 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001900
1901
showarded2afea2009-07-07 20:54:07 +00001902class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001903 def _copy_to_results_repository(self):
1904 if not self.queue_entry or self.queue_entry.meta_host:
1905 return
1906
1907 self.queue_entry.set_execution_subdir()
1908 log_name = os.path.basename(self.task.execution_path())
1909 source = os.path.join(self.task.execution_path(), 'debug',
1910 'autoserv.DEBUG')
1911 destination = os.path.join(
1912 self.queue_entry.execution_path(), log_name)
1913
1914 self.monitor.try_copy_to_results_repository(
1915 source, destination_path=destination)
1916
1917
showard170873e2009-01-07 00:22:26 +00001918 def epilog(self):
1919 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001920
showard775300b2009-09-09 15:30:50 +00001921 if self.success:
1922 return
showard8fe93b52008-11-18 17:53:22 +00001923
showard775300b2009-09-09 15:30:50 +00001924 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001925
showard775300b2009-09-09 15:30:50 +00001926 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001927 # effectively ignore failure for these hosts
1928 self.success = True
showard775300b2009-09-09 15:30:50 +00001929 return
1930
1931 if self.queue_entry:
1932 self.queue_entry.requeue()
1933
1934 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001935 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001936 queue_entry__id=self.queue_entry.id):
1937 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1938 self._fail_queue_entry()
1939 return
1940
showard9bb960b2009-11-19 01:02:11 +00001941 queue_entry = models.HostQueueEntry.objects.get(
1942 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001943 else:
1944 queue_entry = None
1945
1946 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001947 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001948 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001949 queue_entry=queue_entry,
1950 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001951
showard8fe93b52008-11-18 17:53:22 +00001952
1953class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001954 TASK_TYPE = models.SpecialTask.Task.VERIFY
1955
1956
showardd1195652009-12-08 22:21:02 +00001957 def __init__(self, task):
1958 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001959 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001960
1961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001963 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001964
showardb18134f2009-03-20 20:52:18 +00001965 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001966 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001967 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1968 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001969
showarded2afea2009-07-07 20:54:07 +00001970 # Delete any other queued verifies for this host. One verify will do
1971 # and there's no need to keep records of other requests.
1972 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001973 host__id=self.host.id,
1974 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001975 is_active=False, is_complete=False)
1976 queued_verifies = queued_verifies.exclude(id=self.task.id)
1977 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001978
mbligh36768f02008-02-22 18:28:33 +00001979
jadmanski0afbb632008-06-06 21:10:57 +00001980 def epilog(self):
1981 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001982 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001983 if self.queue_entry:
1984 self.queue_entry.on_pending()
1985 else:
1986 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001987
1988
mbligh4608b002010-01-05 18:22:35 +00001989class CleanupTask(PreJobTask):
1990 # note this can also run post-job, but when it does, it's running standalone
1991 # against the host (not related to the job), so it's not considered a
1992 # PostJobTask
1993
1994 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1995
1996
1997 def __init__(self, task, recover_run_monitor=None):
1998 super(CleanupTask, self).__init__(task, ['--cleanup'])
1999 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
2000
2001
2002 def prolog(self):
2003 super(CleanupTask, self).prolog()
2004 logging.info("starting cleanup task for host: %s", self.host.hostname)
2005 self.host.set_status(models.Host.Status.CLEANING)
2006 if self.queue_entry:
2007 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2008
2009
2010 def _finish_epilog(self):
2011 if not self.queue_entry or not self.success:
2012 return
2013
2014 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2015 should_run_verify = (
2016 self.queue_entry.job.run_verify
2017 and self.host.protection != do_not_verify_protection)
2018 if should_run_verify:
2019 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
2020 models.SpecialTask.objects.create(
2021 host=models.Host.objects.get(id=self.host.id),
2022 queue_entry=entry,
2023 task=models.SpecialTask.Task.VERIFY)
2024 else:
2025 self.queue_entry.on_pending()
2026
2027
2028 def epilog(self):
2029 super(CleanupTask, self).epilog()
2030
2031 if self.success:
2032 self.host.update_field('dirty', 0)
2033 self.host.set_status(models.Host.Status.READY)
2034
2035 self._finish_epilog()
2036
2037
showarda9545c02009-12-18 22:44:26 +00002038class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
2039 """
2040 Common functionality for QueueTask and HostlessQueueTask
2041 """
2042 def __init__(self, queue_entries):
2043 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00002044 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00002045 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00002046
2047
showard73ec0442009-02-07 02:05:20 +00002048 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002049 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002050
2051
jamesrenc44ae992010-02-19 00:12:54 +00002052 def _write_control_file(self, execution_path):
2053 control_path = _drone_manager.attach_file_to_execution(
2054 execution_path, self.job.control_file)
2055 return control_path
2056
2057
showardd1195652009-12-08 22:21:02 +00002058 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00002059 execution_path = self.queue_entries[0].execution_path()
2060 control_path = self._write_control_file(execution_path)
2061 hostnames = ','.join(entry.host.hostname
2062 for entry in self.queue_entries
2063 if not entry.is_hostless())
2064
2065 execution_tag = self.queue_entries[0].execution_tag()
2066 params = _autoserv_command_line(
2067 hostnames,
2068 ['-P', execution_tag, '-n',
2069 _drone_manager.absolute_path(control_path)],
2070 job=self.job, verbose=False)
2071
2072 if not self.job.is_server_job():
2073 params.append('-c')
2074
2075 return params
showardd1195652009-12-08 22:21:02 +00002076
2077
2078 @property
2079 def num_processes(self):
2080 return len(self.queue_entries)
2081
2082
2083 @property
2084 def owner_username(self):
2085 return self.job.owner
2086
2087
2088 def _working_directory(self):
2089 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002090
2091
jadmanski0afbb632008-06-06 21:10:57 +00002092 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002093 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00002094 keyval_dict = self.job.keyval_dict()
2095 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00002096 group_name = self.queue_entries[0].get_group_name()
2097 if group_name:
2098 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002099 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002100 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002101 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002102 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002103
2104
showard35162b02009-03-03 02:17:30 +00002105 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002106 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002107 _drone_manager.write_lines_to_file(error_file_path,
2108 [_LOST_PROCESS_ERROR])
2109
2110
showardd3dc1992009-04-22 21:01:40 +00002111 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002112 if not self.monitor:
2113 return
2114
showardd9205182009-04-27 20:09:55 +00002115 self._write_job_finished()
2116
showard35162b02009-03-03 02:17:30 +00002117 if self.monitor.lost_process:
2118 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002119
jadmanskif7fa2cc2008-10-01 14:13:23 +00002120
showardcbd74612008-11-19 21:42:02 +00002121 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002122 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002123 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002124 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002125 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002126
2127
jadmanskif7fa2cc2008-10-01 14:13:23 +00002128 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002129 if not self.monitor or not self.monitor.has_process():
2130 return
2131
jadmanskif7fa2cc2008-10-01 14:13:23 +00002132 # build up sets of all the aborted_by and aborted_on values
2133 aborted_by, aborted_on = set(), set()
2134 for queue_entry in self.queue_entries:
2135 if queue_entry.aborted_by:
2136 aborted_by.add(queue_entry.aborted_by)
2137 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2138 aborted_on.add(t)
2139
2140 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00002141 # TODO(showard): this conditional is now obsolete, we just need to leave
2142 # it in temporarily for backwards compatibility over upgrades. delete
2143 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00002144 assert len(aborted_by) <= 1
2145 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002146 aborted_by_value = aborted_by.pop()
2147 aborted_on_value = max(aborted_on)
2148 else:
2149 aborted_by_value = 'autotest_system'
2150 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002151
showarda0382352009-02-11 23:36:43 +00002152 self._write_keyval_after_job("aborted_by", aborted_by_value)
2153 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002154
showardcbd74612008-11-19 21:42:02 +00002155 aborted_on_string = str(datetime.datetime.fromtimestamp(
2156 aborted_on_value))
2157 self._write_status_comment('Job aborted by %s on %s' %
2158 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002159
2160
jadmanski0afbb632008-06-06 21:10:57 +00002161 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002162 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002163 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002164 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002165
2166
jadmanski0afbb632008-06-06 21:10:57 +00002167 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002168 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002169 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002170
2171
2172class QueueTask(AbstractQueueTask):
2173 def __init__(self, queue_entries):
2174 super(QueueTask, self).__init__(queue_entries)
2175 self._set_ids(queue_entries=queue_entries)
2176
2177
2178 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002179 self._check_queue_entry_statuses(
2180 self.queue_entries,
2181 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
2182 models.HostQueueEntry.Status.RUNNING),
2183 allowed_host_statuses=(models.Host.Status.PENDING,
2184 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00002185
2186 super(QueueTask, self).prolog()
2187
2188 for queue_entry in self.queue_entries:
2189 self._write_host_keyvals(queue_entry.host)
2190 queue_entry.host.set_status(models.Host.Status.RUNNING)
2191 queue_entry.host.update_field('dirty', 1)
2192 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2193 # TODO(gps): Remove this if nothing needs it anymore.
2194 # A potential user is: tko/parser
2195 self.job.write_to_machines_file(self.queue_entries[0])
2196
2197
2198 def _finish_task(self):
2199 super(QueueTask, self)._finish_task()
2200
2201 for queue_entry in self.queue_entries:
2202 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002203
2204
mbligh4608b002010-01-05 18:22:35 +00002205class HostlessQueueTask(AbstractQueueTask):
2206 def __init__(self, queue_entry):
2207 super(HostlessQueueTask, self).__init__([queue_entry])
2208 self.queue_entry_ids = [queue_entry.id]
2209
2210
2211 def prolog(self):
2212 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2213 super(HostlessQueueTask, self).prolog()
2214
2215
mbligh4608b002010-01-05 18:22:35 +00002216 def _finish_task(self):
2217 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00002218 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00002219
2220
showardd3dc1992009-04-22 21:01:40 +00002221class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002222 def __init__(self, queue_entries, log_file_name):
2223 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002224
showardd1195652009-12-08 22:21:02 +00002225 self.queue_entries = queue_entries
2226
showardd3dc1992009-04-22 21:01:40 +00002227 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002228 self._autoserv_monitor.attach_to_existing_process(
2229 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002230
showardd1195652009-12-08 22:21:02 +00002231
2232 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002233 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002234 return 'true'
2235 return self._generate_command(
2236 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002237
2238
2239 def _generate_command(self, results_dir):
2240 raise NotImplementedError('Subclasses must override this')
2241
2242
showardd1195652009-12-08 22:21:02 +00002243 @property
2244 def owner_username(self):
2245 return self.queue_entries[0].job.owner
2246
2247
2248 def _working_directory(self):
2249 return self._get_consistent_execution_path(self.queue_entries)
2250
2251
2252 def _paired_with_monitor(self):
2253 return self._autoserv_monitor
2254
2255
showardd3dc1992009-04-22 21:01:40 +00002256 def _job_was_aborted(self):
2257 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002258 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002259 queue_entry.update_from_database()
2260 if was_aborted is None: # first queue entry
2261 was_aborted = bool(queue_entry.aborted)
2262 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2263 email_manager.manager.enqueue_notify_email(
2264 'Inconsistent abort state',
2265 'Queue entries have inconsistent abort state: ' +
2266 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2267 # don't crash here, just assume true
2268 return True
2269 return was_aborted
2270
2271
showardd1195652009-12-08 22:21:02 +00002272 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002273 if self._job_was_aborted():
2274 return models.HostQueueEntry.Status.ABORTED
2275
2276 # we'll use a PidfileRunMonitor to read the autoserv exit status
2277 if self._autoserv_monitor.exit_code() == 0:
2278 return models.HostQueueEntry.Status.COMPLETED
2279 return models.HostQueueEntry.Status.FAILED
2280
2281
showardd3dc1992009-04-22 21:01:40 +00002282 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002283 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002284 queue_entry.set_status(status)
2285
2286
2287 def abort(self):
2288 # override AgentTask.abort() to avoid killing the process and ending
2289 # the task. post-job tasks continue when the job is aborted.
2290 pass
2291
2292
mbligh4608b002010-01-05 18:22:35 +00002293 def _pidfile_label(self):
2294 # '.autoserv_execute' -> 'autoserv'
2295 return self._pidfile_name()[1:-len('_execute')]
2296
2297
showard9bb960b2009-11-19 01:02:11 +00002298class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002299 """
2300 Task responsible for
2301 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2302 * copying logs to the results repository
2303 * spawning CleanupTasks for hosts, if necessary
2304 * spawning a FinalReparseTask for the job
2305 """
showardd1195652009-12-08 22:21:02 +00002306 def __init__(self, queue_entries, recover_run_monitor=None):
2307 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002308 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002309 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002310 self._set_ids(queue_entries=queue_entries)
2311
2312
2313 def _generate_command(self, results_dir):
2314 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002315 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00002316 return [_autoserv_path , '-p',
2317 '--pidfile-label=%s' % self._pidfile_label(),
2318 '--use-existing-results', '--collect-crashinfo',
2319 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002320
2321
showardd1195652009-12-08 22:21:02 +00002322 @property
2323 def num_processes(self):
2324 return len(self.queue_entries)
2325
2326
2327 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002328 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002329
2330
showardd3dc1992009-04-22 21:01:40 +00002331 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002332 self._check_queue_entry_statuses(
2333 self.queue_entries,
2334 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2335 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002336
showardd3dc1992009-04-22 21:01:40 +00002337 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002338
2339
showardd3dc1992009-04-22 21:01:40 +00002340 def epilog(self):
2341 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002342 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002343 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002344
showard9bb960b2009-11-19 01:02:11 +00002345
2346 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002347 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002348 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002349 models.HostQueueEntry.Status.COMPLETED)
2350 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2351 else:
2352 final_success = False
2353 num_tests_failed = 0
2354
showard9bb960b2009-11-19 01:02:11 +00002355 reboot_after = self._job.reboot_after
2356 do_reboot = (
2357 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002358 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002359 or reboot_after == model_attributes.RebootAfter.ALWAYS
2360 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002361 and final_success and num_tests_failed == 0))
2362
showardd1195652009-12-08 22:21:02 +00002363 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002364 if do_reboot:
2365 # don't pass the queue entry to the CleanupTask. if the cleanup
2366 # fails, the job doesn't care -- it's over.
2367 models.SpecialTask.objects.create(
2368 host=models.Host.objects.get(id=queue_entry.host.id),
2369 task=models.SpecialTask.Task.CLEANUP,
2370 requested_by=self._job.owner_model())
2371 else:
2372 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002373
2374
showard0bbfc212009-04-29 21:06:13 +00002375 def run(self):
showard597bfd32009-05-08 18:22:50 +00002376 autoserv_exit_code = self._autoserv_monitor.exit_code()
2377 # only run if Autoserv exited due to some signal. if we have no exit
2378 # code, assume something bad (and signal-like) happened.
2379 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002380 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002381 else:
2382 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002383
2384
mbligh4608b002010-01-05 18:22:35 +00002385class SelfThrottledPostJobTask(PostJobTask):
2386 """
2387 Special AgentTask subclass that maintains its own global process limit.
2388 """
2389 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002390
2391
mbligh4608b002010-01-05 18:22:35 +00002392 @classmethod
2393 def _increment_running_processes(cls):
2394 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002395
mblighd5c95802008-03-05 00:33:46 +00002396
mbligh4608b002010-01-05 18:22:35 +00002397 @classmethod
2398 def _decrement_running_processes(cls):
2399 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002400
2401
mbligh4608b002010-01-05 18:22:35 +00002402 @classmethod
2403 def _max_processes(cls):
2404 raise NotImplementedError
2405
2406
2407 @classmethod
2408 def _can_run_new_process(cls):
2409 return cls._num_running_processes < cls._max_processes()
2410
2411
2412 def _process_started(self):
2413 return bool(self.monitor)
2414
2415
2416 def tick(self):
2417 # override tick to keep trying to start until the process count goes
2418 # down and we can, at which point we revert to default behavior
2419 if self._process_started():
2420 super(SelfThrottledPostJobTask, self).tick()
2421 else:
2422 self._try_starting_process()
2423
2424
2425 def run(self):
2426 # override run() to not actually run unless we can
2427 self._try_starting_process()
2428
2429
2430 def _try_starting_process(self):
2431 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002432 return
2433
mbligh4608b002010-01-05 18:22:35 +00002434 # actually run the command
2435 super(SelfThrottledPostJobTask, self).run()
2436 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002437
mblighd5c95802008-03-05 00:33:46 +00002438
mbligh4608b002010-01-05 18:22:35 +00002439 def finished(self, success):
2440 super(SelfThrottledPostJobTask, self).finished(success)
2441 if self._process_started():
2442 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002443
showard21baa452008-10-21 00:08:39 +00002444
mbligh4608b002010-01-05 18:22:35 +00002445class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002446 def __init__(self, queue_entries):
2447 super(FinalReparseTask, self).__init__(queue_entries,
2448 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002449 # don't use _set_ids, since we don't want to set the host_ids
2450 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002451
2452
2453 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002454 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002455 results_dir]
2456
2457
2458 @property
2459 def num_processes(self):
2460 return 0 # don't include parser processes in accounting
2461
2462
2463 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002464 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002465
2466
showard97aed502008-11-04 02:01:24 +00002467 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002468 def _max_processes(cls):
2469 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002470
2471
2472 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002473 self._check_queue_entry_statuses(
2474 self.queue_entries,
2475 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002476
showard97aed502008-11-04 02:01:24 +00002477 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002478
2479
2480 def epilog(self):
2481 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002482 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002483
2484
mbligh4608b002010-01-05 18:22:35 +00002485class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002486 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2487
mbligh4608b002010-01-05 18:22:35 +00002488 def __init__(self, queue_entries):
2489 super(ArchiveResultsTask, self).__init__(queue_entries,
2490 log_file_name='.archiving.log')
2491 # don't use _set_ids, since we don't want to set the host_ids
2492 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002493
2494
mbligh4608b002010-01-05 18:22:35 +00002495 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002496 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002497
2498
mbligh4608b002010-01-05 18:22:35 +00002499 def _generate_command(self, results_dir):
2500 return [_autoserv_path , '-p',
2501 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002502 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002503 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2504 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002505
2506
mbligh4608b002010-01-05 18:22:35 +00002507 @classmethod
2508 def _max_processes(cls):
2509 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002510
2511
2512 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002513 self._check_queue_entry_statuses(
2514 self.queue_entries,
2515 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2516
2517 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002518
2519
mbligh4608b002010-01-05 18:22:35 +00002520 def epilog(self):
2521 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002522 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002523 failed_file = os.path.join(self._working_directory(),
2524 self._ARCHIVING_FAILED_FILE)
2525 paired_process = self._paired_with_monitor().get_process()
2526 _drone_manager.write_lines_to_file(
2527 failed_file, ['Archiving failed with exit code %s'
2528 % self.monitor.exit_code()],
2529 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002530 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002531
2532
mbligh36768f02008-02-22 18:28:33 +00002533if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002534 main()