blob: 3a8c7d79e60a5dec6dd7e9bec36370bf27c0f0a4 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080026from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesrenc44ae992010-02-19 00:12:54 +000028from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000055_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000056_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000057
Eric Lie0493a42010-11-15 13:05:43 -080058def _parser_path_default(install_dir):
59 return os.path.join(install_dir, 'tko', 'parse')
60_parser_path_func = utils.import_site_function(
61 __file__, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default)
63_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
64
mbligh36768f02008-02-22 18:28:33 +000065
showardec6a3b92009-09-25 20:29:13 +000066def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins * 60
71
72
mbligh83c1e9e2009-05-01 23:10:41 +000073def _site_init_monitor_db_dummy():
74 return {}
75
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080080 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000082
83
84def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
87
88
mbligh36768f02008-02-22 18:28:33 +000089def main():
showard27f33872009-04-07 18:20:53 +000090 try:
showard549afad2009-08-20 23:33:36 +000091 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000100
101
102def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000103 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000104
showard136e6dc2009-06-10 19:38:49 +0000105 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
112 (options, args) = parser.parse_args()
113 if len(args) != 1:
114 parser.print_usage()
115 return
mbligh36768f02008-02-22 18:28:33 +0000116
showard5613c662009-06-08 23:30:33 +0000117 scheduler_enabled = global_config.global_config.get_config_value(
118 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
119
120 if not scheduler_enabled:
121 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
122 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000123 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000124 sys.exit(1)
125
jadmanski0afbb632008-06-06 21:10:57 +0000126 global RESULTS_DIR
127 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000128
mbligh83c1e9e2009-05-01 23:10:41 +0000129 site_init = utils.import_site_function(__file__,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy)
132 site_init()
133
showardcca334f2009-03-12 20:38:34 +0000134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os.chdir(RESULTS_DIR)
138
jamesrenc7d387e2010-08-10 21:48:30 +0000139 # This is helpful for debugging why stuff a scheduler launches is
140 # misbehaving.
141 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 if options.test:
144 global _autoserv_path
145 _autoserv_path = 'autoserv_dummy'
146 global _testing_mode
147 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000148
jamesrenc44ae992010-02-19 00:12:54 +0000149 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000150 server.start()
151
jadmanski0afbb632008-06-06 21:10:57 +0000152 try:
jamesrenc44ae992010-02-19 00:12:54 +0000153 initialize()
showardc5afc462009-01-13 00:09:39 +0000154 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000155 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000156
Eric Lia82dc352011-02-23 13:15:52 -0800157 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000158 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000159 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000160 except:
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000163
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000165 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000166 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000167 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000168
169
showard136e6dc2009-06-10 19:38:49 +0000170def setup_logging():
171 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager.configure_logging(
174 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
175 logfile_name=log_name)
176
177
mbligh36768f02008-02-22 18:28:33 +0000178def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000179 global _shutdown
180 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000182
183
jamesrenc44ae992010-02-19 00:12:54 +0000184def initialize():
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
186 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000187
showard8de37132009-08-31 18:33:08 +0000188 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000189 logging.critical("monitor_db already running, aborting!")
190 sys.exit(1)
191 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000192
showardb1e51872008-10-07 11:08:18 +0000193 if _testing_mode:
194 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000195 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000196
jadmanski0afbb632008-06-06 21:10:57 +0000197 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
198 global _db
showard170873e2009-01-07 00:22:26 +0000199 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000200 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000201
showardfa8629c2008-11-04 16:51:23 +0000202 # ensure Django connection is in autocommit
203 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000204 # bypass the readonly connection
205 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000206
showardb18134f2009-03-20 20:52:18 +0000207 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000208 signal.signal(signal.SIGINT, handle_sigint)
209
jamesrenc44ae992010-02-19 00:12:54 +0000210 initialize_globals()
211 scheduler_models.initialize()
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
jamesrenc44ae992010-02-19 00:12:54 +0000223def initialize_globals():
224 global _drone_manager
225 _drone_manager = drone_manager.instance()
226
227
showarded2afea2009-07-07 20:54:07 +0000228def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
229 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000230 """
231 @returns The autoserv command line as a list of executable + parameters.
232
233 @param machines - string - A machine or comma separated list of machines
234 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showarda9545c02009-12-18 22:44:26 +0000241 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000242 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000243 if machines:
244 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000249 if verbose:
250 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000251 return autoserv_argv + extra_args
252
253
Simran Basia858a232012-08-21 11:04:37 -0700254class BaseDispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000255 def __init__(self):
256 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000257 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800258 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000259 user_cleanup_time = scheduler_config.config.clean_interval
260 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
261 _db, user_cleanup_time)
262 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000263 self._host_agents = {}
264 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000265 self._tick_count = 0
266 self._last_garbage_stats_time = time.time()
267 self._seconds_between_garbage_stats = 60 * (
268 global_config.global_config.get_config_value(
269 scheduler_config.CONFIG_SECTION,
Dale Curtis456d3c12011-07-19 11:42:51 -0700270 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000271
mbligh36768f02008-02-22 18:28:33 +0000272
showard915958d2009-04-22 21:00:58 +0000273 def initialize(self, recover_hosts=True):
274 self._periodic_cleanup.initialize()
275 self._24hr_upkeep.initialize()
276
jadmanski0afbb632008-06-06 21:10:57 +0000277 # always recover processes
278 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000279
jadmanski0afbb632008-06-06 21:10:57 +0000280 if recover_hosts:
281 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000282
jamesrenc44ae992010-02-19 00:12:54 +0000283 self._host_scheduler.recovery_on_startup()
284
mbligh36768f02008-02-22 18:28:33 +0000285
jadmanski0afbb632008-06-06 21:10:57 +0000286 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000287 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000288 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000289 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000290 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000291 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000292 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000293 self._schedule_running_host_queue_entries()
294 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000295 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000296 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000297 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000298 _drone_manager.execute_actions()
299 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000300 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000301 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000302
showard97aed502008-11-04 02:01:24 +0000303
mblighf3294cc2009-04-08 21:17:38 +0000304 def _run_cleanup(self):
305 self._periodic_cleanup.run_cleanup_maybe()
306 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000307
mbligh36768f02008-02-22 18:28:33 +0000308
showardf13a9e22009-12-18 22:54:09 +0000309 def _garbage_collection(self):
310 threshold_time = time.time() - self._seconds_between_garbage_stats
311 if threshold_time < self._last_garbage_stats_time:
312 # Don't generate these reports very often.
313 return
314
315 self._last_garbage_stats_time = time.time()
316 # Force a full level 0 collection (because we can, it doesn't hurt
317 # at this interval).
318 gc.collect()
319 logging.info('Logging garbage collector stats on tick %d.',
320 self._tick_count)
321 gc_stats._log_garbage_collector_stats()
322
323
showard170873e2009-01-07 00:22:26 +0000324 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
325 for object_id in object_ids:
326 agent_dict.setdefault(object_id, set()).add(agent)
327
328
329 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
330 for object_id in object_ids:
331 assert object_id in agent_dict
332 agent_dict[object_id].remove(agent)
333
334
showardd1195652009-12-08 22:21:02 +0000335 def add_agent_task(self, agent_task):
336 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000337 self._agents.append(agent)
338 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000339 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
340 self._register_agent_for_ids(self._queue_entry_agents,
341 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000342
showard170873e2009-01-07 00:22:26 +0000343
344 def get_agents_for_entry(self, queue_entry):
345 """
346 Find agents corresponding to the specified queue_entry.
347 """
showardd3dc1992009-04-22 21:01:40 +0000348 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000349
350
351 def host_has_agent(self, host):
352 """
353 Determine if there is currently an Agent present using this host.
354 """
355 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000356
357
jadmanski0afbb632008-06-06 21:10:57 +0000358 def remove_agent(self, agent):
359 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000360 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
361 agent)
362 self._unregister_agent_for_ids(self._queue_entry_agents,
363 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000364
365
showard8cc058f2009-09-08 16:26:33 +0000366 def _host_has_scheduled_special_task(self, host):
367 return bool(models.SpecialTask.objects.filter(host__id=host.id,
368 is_active=False,
369 is_complete=False))
370
371
jadmanski0afbb632008-06-06 21:10:57 +0000372 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000373 agent_tasks = self._create_recovery_agent_tasks()
374 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000375 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000376 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000377 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000378 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000379 self._reverify_remaining_hosts()
380 # reinitialize drones after killing orphaned processes, since they can
381 # leave around files when they die
382 _drone_manager.execute_actions()
383 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000384
showard170873e2009-01-07 00:22:26 +0000385
showardd1195652009-12-08 22:21:02 +0000386 def _create_recovery_agent_tasks(self):
387 return (self._get_queue_entry_agent_tasks()
388 + self._get_special_task_agent_tasks(is_active=True))
389
390
391 def _get_queue_entry_agent_tasks(self):
392 # host queue entry statuses handled directly by AgentTasks (Verifying is
393 # handled through SpecialTasks, so is not listed here)
394 statuses = (models.HostQueueEntry.Status.STARTING,
395 models.HostQueueEntry.Status.RUNNING,
396 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000397 models.HostQueueEntry.Status.PARSING,
398 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000399 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000400 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000401 where='status IN (%s)' % status_list)
402
403 agent_tasks = []
404 used_queue_entries = set()
405 for entry in queue_entries:
406 if self.get_agents_for_entry(entry):
407 # already being handled
408 continue
409 if entry in used_queue_entries:
410 # already picked up by a synchronous job
411 continue
412 agent_task = self._get_agent_task_for_queue_entry(entry)
413 agent_tasks.append(agent_task)
414 used_queue_entries.update(agent_task.queue_entries)
415 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000416
417
showardd1195652009-12-08 22:21:02 +0000418 def _get_special_task_agent_tasks(self, is_active=False):
419 special_tasks = models.SpecialTask.objects.filter(
420 is_active=is_active, is_complete=False)
421 return [self._get_agent_task_for_special_task(task)
422 for task in special_tasks]
423
424
425 def _get_agent_task_for_queue_entry(self, queue_entry):
426 """
427 Construct an AgentTask instance for the given active HostQueueEntry,
428 if one can currently run it.
429 @param queue_entry: a HostQueueEntry
430 @returns an AgentTask to run the queue entry
431 """
432 task_entries = queue_entry.job.get_group_entries(queue_entry)
433 self._check_for_duplicate_host_entries(task_entries)
434
435 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
436 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000437 if queue_entry.is_hostless():
438 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000439 return QueueTask(queue_entries=task_entries)
440 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
441 return GatherLogsTask(queue_entries=task_entries)
442 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
443 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000444 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
445 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000446
Dale Curtisaa513362011-03-01 17:27:44 -0800447 raise host_scheduler.SchedulerError(
448 '_get_agent_task_for_queue_entry got entry with '
449 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000450
451
452 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000453 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
454 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000455 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000456 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000457 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000458 if using_host:
showardd1195652009-12-08 22:21:02 +0000459 self._assert_host_has_no_agent(task_entry)
460
461
462 def _assert_host_has_no_agent(self, entry):
463 """
464 @param entry: a HostQueueEntry or a SpecialTask
465 """
466 if self.host_has_agent(entry.host):
467 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800468 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000469 'While scheduling %s, host %s already has a host agent %s'
470 % (entry, entry.host, agent.task))
471
472
473 def _get_agent_task_for_special_task(self, special_task):
474 """
475 Construct an AgentTask class to run the given SpecialTask and add it
476 to this dispatcher.
477 @param special_task: a models.SpecialTask instance
478 @returns an AgentTask to run this SpecialTask
479 """
480 self._assert_host_has_no_agent(special_task)
481
482 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
483 for agent_task_class in special_agent_task_classes:
484 if agent_task_class.TASK_TYPE == special_task.task:
485 return agent_task_class(task=special_task)
486
Dale Curtisaa513362011-03-01 17:27:44 -0800487 raise host_scheduler.SchedulerError(
488 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000489
490
491 def _register_pidfiles(self, agent_tasks):
492 for agent_task in agent_tasks:
493 agent_task.register_necessary_pidfiles()
494
495
496 def _recover_tasks(self, agent_tasks):
497 orphans = _drone_manager.get_orphaned_autoserv_processes()
498
499 for agent_task in agent_tasks:
500 agent_task.recover()
501 if agent_task.monitor and agent_task.monitor.has_process():
502 orphans.discard(agent_task.monitor.get_process())
503 self.add_agent_task(agent_task)
504
505 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000506
507
showard8cc058f2009-09-08 16:26:33 +0000508 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000509 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
510 % status):
showard0db3d432009-10-12 20:29:15 +0000511 if entry.status == status and not self.get_agents_for_entry(entry):
512 # The status can change during iteration, e.g., if job.run()
513 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000514 yield entry
515
516
showard6878e8b2009-07-20 22:37:45 +0000517 def _check_for_remaining_orphan_processes(self, orphans):
518 if not orphans:
519 return
520 subject = 'Unrecovered orphan autoserv processes remain'
521 message = '\n'.join(str(process) for process in orphans)
522 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000523
524 die_on_orphans = global_config.global_config.get_config_value(
525 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
526
527 if die_on_orphans:
528 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000529
showard170873e2009-01-07 00:22:26 +0000530
showard8cc058f2009-09-08 16:26:33 +0000531 def _recover_pending_entries(self):
532 for entry in self._get_unassigned_entries(
533 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000534 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000535 entry.on_pending()
536
537
showardb8900452009-10-12 20:31:01 +0000538 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000539 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000540 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
541 unrecovered_hqes = []
542 for queue_entry in queue_entries:
543 special_tasks = models.SpecialTask.objects.filter(
544 task__in=(models.SpecialTask.Task.CLEANUP,
545 models.SpecialTask.Task.VERIFY),
546 queue_entry__id=queue_entry.id,
547 is_complete=False)
548 if special_tasks.count() == 0:
549 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000550
showardb8900452009-10-12 20:31:01 +0000551 if unrecovered_hqes:
552 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800553 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000554 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000555 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000556
557
showard65db3932009-10-28 19:54:35 +0000558 def _get_prioritized_special_tasks(self):
559 """
560 Returns all queued SpecialTasks prioritized for repair first, then
561 cleanup, then verify.
562 """
563 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
564 is_complete=False,
565 host__locked=False)
566 # exclude hosts with active queue entries unless the SpecialTask is for
567 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000568 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000569 queued_tasks, 'afe_host_queue_entries', 'host_id',
570 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000571 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000572 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000573 where=['(afe_host_queue_entries.id IS NULL OR '
574 'afe_host_queue_entries.id = '
575 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000576
showard65db3932009-10-28 19:54:35 +0000577 # reorder tasks by priority
578 task_priority_order = [models.SpecialTask.Task.REPAIR,
579 models.SpecialTask.Task.CLEANUP,
580 models.SpecialTask.Task.VERIFY]
581 def task_priority_key(task):
582 return task_priority_order.index(task.task)
583 return sorted(queued_tasks, key=task_priority_key)
584
585
showard65db3932009-10-28 19:54:35 +0000586 def _schedule_special_tasks(self):
587 """
588 Execute queued SpecialTasks that are ready to run on idle hosts.
589 """
590 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000591 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000592 continue
showardd1195652009-12-08 22:21:02 +0000593 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000594
595
showard170873e2009-01-07 00:22:26 +0000596 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000597 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000598 # should never happen
showarded2afea2009-07-07 20:54:07 +0000599 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000600 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000601 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000602 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000603 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000604
605
jadmanski0afbb632008-06-06 21:10:57 +0000606 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000607 print_message='Reverifying host %s'):
Dale Curtis456d3c12011-07-19 11:42:51 -0700608 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000609 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000610 if self.host_has_agent(host):
611 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000612 continue
showard8cc058f2009-09-08 16:26:33 +0000613 if self._host_has_scheduled_special_task(host):
614 # host will have a special task scheduled on the next cycle
615 continue
showard170873e2009-01-07 00:22:26 +0000616 if print_message:
showardb18134f2009-03-20 20:52:18 +0000617 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000618 models.SpecialTask.objects.create(
619 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000620 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000621
622
jadmanski0afbb632008-06-06 21:10:57 +0000623 def _recover_hosts(self):
624 # recover "Repair Failed" hosts
625 message = 'Reverifying dead host %s'
626 self._reverify_hosts_where("status = 'Repair Failed'",
627 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000628
629
showard04c82c52008-05-29 19:38:12 +0000630
showardb95b1bd2008-08-15 18:11:04 +0000631 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000632 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000633 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000634 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000635 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000636 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000637
638
showard89f84db2009-03-12 20:39:13 +0000639 def _refresh_pending_queue_entries(self):
640 """
641 Lookup the pending HostQueueEntries and call our HostScheduler
642 refresh() method given that list. Return the list.
643
644 @returns A list of pending HostQueueEntries sorted in priority order.
645 """
showard63a34772008-08-18 19:32:50 +0000646 queue_entries = self._get_pending_queue_entries()
647 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000648 return []
showardb95b1bd2008-08-15 18:11:04 +0000649
showard63a34772008-08-18 19:32:50 +0000650 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000651
showard89f84db2009-03-12 20:39:13 +0000652 return queue_entries
653
654
655 def _schedule_atomic_group(self, queue_entry):
656 """
657 Schedule the given queue_entry on an atomic group of hosts.
658
659 Returns immediately if there are insufficient available hosts.
660
661 Creates new HostQueueEntries based off of queue_entry for the
662 scheduled hosts and starts them all running.
663 """
664 # This is a virtual host queue entry representing an entire
665 # atomic group, find a group and schedule their hosts.
666 group_hosts = self._host_scheduler.find_eligible_atomic_group(
667 queue_entry)
668 if not group_hosts:
669 return
showardcbe6f942009-06-17 19:33:49 +0000670
671 logging.info('Expanding atomic group entry %s with hosts %s',
672 queue_entry,
673 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000674
showard89f84db2009-03-12 20:39:13 +0000675 for assigned_host in group_hosts[1:]:
676 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000677 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000678 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000679 new_hqe.set_host(assigned_host)
680 self._run_queue_entry(new_hqe)
681
682 # The first assigned host uses the original HostQueueEntry
683 queue_entry.set_host(group_hosts[0])
684 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000685
686
showarda9545c02009-12-18 22:44:26 +0000687 def _schedule_hostless_job(self, queue_entry):
688 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000689 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000690
691
showard89f84db2009-03-12 20:39:13 +0000692 def _schedule_new_jobs(self):
693 queue_entries = self._refresh_pending_queue_entries()
694 if not queue_entries:
695 return
696
showard63a34772008-08-18 19:32:50 +0000697 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +0000698 is_unassigned_atomic_group = (
699 queue_entry.atomic_group_id is not None
700 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000701
702 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000703 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000704 elif is_unassigned_atomic_group:
705 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000706 else:
jamesren883492a2010-02-12 00:45:18 +0000707 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000708 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000709 assert assigned_host.id == queue_entry.host_id
710 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000711
712
showard8cc058f2009-09-08 16:26:33 +0000713 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000714 for agent_task in self._get_queue_entry_agent_tasks():
715 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000716
717
718 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000719 for entry in scheduler_models.HostQueueEntry.fetch(
720 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000721 task = entry.job.schedule_delayed_callback_task(entry)
722 if task:
showardd1195652009-12-08 22:21:02 +0000723 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000724
725
jamesren883492a2010-02-12 00:45:18 +0000726 def _run_queue_entry(self, queue_entry):
727 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000728
729
jadmanski0afbb632008-06-06 21:10:57 +0000730 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000731 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000732 for entry in scheduler_models.HostQueueEntry.fetch(
733 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000734 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000735 for agent in self.get_agents_for_entry(entry):
736 agent.abort()
737 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000738 jobs_to_stop.add(entry.job)
739 for job in jobs_to_stop:
740 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000741
742
showard324bf812009-01-20 23:23:38 +0000743 def _can_start_agent(self, agent, num_started_this_cycle,
744 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000745 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000746 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000747 return True
748 # don't allow any nonzero-process agents to run after we've reached a
749 # limit (this avoids starvation of many-process agents)
750 if have_reached_limit:
751 return False
752 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000753 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000754 agent.task.owner_username,
755 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000756 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000757 return False
758 # if a single agent exceeds the per-cycle throttling, still allow it to
759 # run when it's the first agent in the cycle
760 if num_started_this_cycle == 0:
761 return True
762 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000763 if (num_started_this_cycle + agent.task.num_processes >
764 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000765 return False
766 return True
767
768
jadmanski0afbb632008-06-06 21:10:57 +0000769 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000770 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000771 have_reached_limit = False
772 # iterate over copy, so we can remove agents during iteration
773 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +0000774 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000775 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000776 have_reached_limit):
777 have_reached_limit = True
778 continue
showardd1195652009-12-08 22:21:02 +0000779 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +0000780 agent.tick()
showard8cc058f2009-09-08 16:26:33 +0000781 if agent.is_done():
782 logging.info("agent finished")
783 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +0000784 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000785 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000786
787
showard29f7cd22009-04-29 21:16:24 +0000788 def _process_recurring_runs(self):
789 recurring_runs = models.RecurringRun.objects.filter(
790 start_date__lte=datetime.datetime.now())
791 for rrun in recurring_runs:
792 # Create job from template
793 job = rrun.job
794 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000795 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000796
797 host_objects = info['hosts']
798 one_time_hosts = info['one_time_hosts']
799 metahost_objects = info['meta_hosts']
800 dependencies = info['dependencies']
801 atomic_group = info['atomic_group']
802
803 for host in one_time_hosts or []:
804 this_host = models.Host.create_one_time_host(host.hostname)
805 host_objects.append(this_host)
806
807 try:
808 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000809 options=options,
showard29f7cd22009-04-29 21:16:24 +0000810 host_objects=host_objects,
811 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000812 atomic_group=atomic_group)
813
814 except Exception, ex:
815 logging.exception(ex)
816 #TODO send email
817
818 if rrun.loop_count == 1:
819 rrun.delete()
820 else:
821 if rrun.loop_count != 0: # if not infinite loop
822 # calculate new start_date
823 difference = datetime.timedelta(seconds=rrun.loop_period)
824 rrun.start_date = rrun.start_date + difference
825 rrun.loop_count -= 1
826 rrun.save()
827
828
Simran Basia858a232012-08-21 11:04:37 -0700829SiteDispatcher = utils.import_site_class(
830 __file__, 'autotest_lib.scheduler.site_monitor_db',
831 'SiteDispatcher', BaseDispatcher)
832
833class Dispatcher(SiteDispatcher):
834 pass
835
836
showard170873e2009-01-07 00:22:26 +0000837class PidfileRunMonitor(object):
838 """
839 Client must call either run() to start a new process or
840 attach_to_existing_process().
841 """
mbligh36768f02008-02-22 18:28:33 +0000842
showard170873e2009-01-07 00:22:26 +0000843 class _PidfileException(Exception):
844 """
845 Raised when there's some unexpected behavior with the pid file, but only
846 used internally (never allowed to escape this class).
847 """
mbligh36768f02008-02-22 18:28:33 +0000848
849
showard170873e2009-01-07 00:22:26 +0000850 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000851 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000852 self._start_time = None
853 self.pidfile_id = None
854 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000855
856
showard170873e2009-01-07 00:22:26 +0000857 def _add_nice_command(self, command, nice_level):
858 if not nice_level:
859 return command
860 return ['nice', '-n', str(nice_level)] + command
861
862
863 def _set_start_time(self):
864 self._start_time = time.time()
865
866
showard418785b2009-11-23 20:19:59 +0000867 def run(self, command, working_directory, num_processes, nice_level=None,
868 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000869 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000870 assert command is not None
871 if nice_level is not None:
872 command = ['nice', '-n', str(nice_level)] + command
873 self._set_start_time()
874 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000875 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000876 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000877 paired_with_pidfile=paired_with_pidfile, username=username,
878 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000879
880
showarded2afea2009-07-07 20:54:07 +0000881 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000882 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000883 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000884 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000885 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000886 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000887 if num_processes is not None:
888 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000889
890
jadmanski0afbb632008-06-06 21:10:57 +0000891 def kill(self):
showard170873e2009-01-07 00:22:26 +0000892 if self.has_process():
893 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000894
mbligh36768f02008-02-22 18:28:33 +0000895
showard170873e2009-01-07 00:22:26 +0000896 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000897 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000898 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000899
900
showard170873e2009-01-07 00:22:26 +0000901 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000902 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000903 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000904 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000905
906
showard170873e2009-01-07 00:22:26 +0000907 def _read_pidfile(self, use_second_read=False):
908 assert self.pidfile_id is not None, (
909 'You must call run() or attach_to_existing_process()')
910 contents = _drone_manager.get_pidfile_contents(
911 self.pidfile_id, use_second_read=use_second_read)
912 if contents.is_invalid():
913 self._state = drone_manager.PidfileContents()
914 raise self._PidfileException(contents)
915 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000916
917
showard21baa452008-10-21 00:08:39 +0000918 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000919 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
920 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000921 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000922 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000923
924
925 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000926 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000927 return
mblighbb421852008-03-11 22:36:16 +0000928
showard21baa452008-10-21 00:08:39 +0000929 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000930
showard170873e2009-01-07 00:22:26 +0000931 if self._state.process is None:
932 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000933 return
mbligh90a549d2008-03-25 23:52:34 +0000934
showard21baa452008-10-21 00:08:39 +0000935 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000936 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000937 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000938 return
mbligh90a549d2008-03-25 23:52:34 +0000939
showard170873e2009-01-07 00:22:26 +0000940 # pid but no running process - maybe process *just* exited
941 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000942 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000943 # autoserv exited without writing an exit code
944 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000945 self._handle_pidfile_error(
946 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000947
showard21baa452008-10-21 00:08:39 +0000948
949 def _get_pidfile_info(self):
950 """\
951 After completion, self._state will contain:
952 pid=None, exit_status=None if autoserv has not yet run
953 pid!=None, exit_status=None if autoserv is running
954 pid!=None, exit_status!=None if autoserv has completed
955 """
956 try:
957 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000958 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +0000959 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +0000960
961
showard170873e2009-01-07 00:22:26 +0000962 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +0000963 """\
964 Called when no pidfile is found or no pid is in the pidfile.
965 """
showard170873e2009-01-07 00:22:26 +0000966 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +0000967 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +0000968 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +0000969 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +0000970 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +0000971
972
showard35162b02009-03-03 02:17:30 +0000973 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +0000974 """\
975 Called when autoserv has exited without writing an exit status,
976 or we've timed out waiting for autoserv to write a pid to the
977 pidfile. In either case, we just return failure and the caller
978 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +0000979
showard170873e2009-01-07 00:22:26 +0000980 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +0000981 """
982 self.lost_process = True
showard170873e2009-01-07 00:22:26 +0000983 self._state.process = process
showard21baa452008-10-21 00:08:39 +0000984 self._state.exit_status = 1
985 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +0000986
987
jadmanski0afbb632008-06-06 21:10:57 +0000988 def exit_code(self):
showard21baa452008-10-21 00:08:39 +0000989 self._get_pidfile_info()
990 return self._state.exit_status
991
992
993 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +0000994 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +0000995 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +0000996 if self._state.num_tests_failed is None:
997 return -1
showard21baa452008-10-21 00:08:39 +0000998 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +0000999
1000
showardcdaeae82009-08-31 18:32:48 +00001001 def try_copy_results_on_drone(self, **kwargs):
1002 if self.has_process():
1003 # copy results logs into the normal place for job results
1004 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1005
1006
1007 def try_copy_to_results_repository(self, source, **kwargs):
1008 if self.has_process():
1009 _drone_manager.copy_to_results_repository(self.get_process(),
1010 source, **kwargs)
1011
1012
mbligh36768f02008-02-22 18:28:33 +00001013class Agent(object):
showard77182562009-06-10 00:16:05 +00001014 """
showard8cc058f2009-09-08 16:26:33 +00001015 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001016
1017 The following methods are required on all task objects:
1018 poll() - Called periodically to let the task check its status and
1019 update its internal state. If the task succeeded.
1020 is_done() - Returns True if the task is finished.
1021 abort() - Called when an abort has been requested. The task must
1022 set its aborted attribute to True if it actually aborted.
1023
1024 The following attributes are required on all task objects:
1025 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001026 success - bool, True if this task succeeded.
1027 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1028 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001029 """
1030
1031
showard418785b2009-11-23 20:19:59 +00001032 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001033 """
showard8cc058f2009-09-08 16:26:33 +00001034 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001035 """
showard8cc058f2009-09-08 16:26:33 +00001036 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001037
showard77182562009-06-10 00:16:05 +00001038 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001039 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001040
showard8cc058f2009-09-08 16:26:33 +00001041 self.queue_entry_ids = task.queue_entry_ids
1042 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001043
showard8cc058f2009-09-08 16:26:33 +00001044 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001045 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001046
1047
jadmanski0afbb632008-06-06 21:10:57 +00001048 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001049 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001050 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001051 self.task.poll()
1052 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001053 self.finished = True
showardec113162008-05-08 00:52:49 +00001054
1055
jadmanski0afbb632008-06-06 21:10:57 +00001056 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001057 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001058
1059
showardd3dc1992009-04-22 21:01:40 +00001060 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001061 if self.task:
1062 self.task.abort()
1063 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001064 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001065 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001066
showardd3dc1992009-04-22 21:01:40 +00001067
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001068class BaseAgentTask(object):
showardd1195652009-12-08 22:21:02 +00001069 class _NullMonitor(object):
1070 pidfile_id = None
1071
1072 def has_process(self):
1073 return True
1074
1075
1076 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001077 """
showardd1195652009-12-08 22:21:02 +00001078 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001079 """
jadmanski0afbb632008-06-06 21:10:57 +00001080 self.done = False
showardd1195652009-12-08 22:21:02 +00001081 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001082 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001083 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001084 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001085 self.queue_entry_ids = []
1086 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001087 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001088
1089
1090 def _set_ids(self, host=None, queue_entries=None):
1091 if queue_entries and queue_entries != [None]:
1092 self.host_ids = [entry.host.id for entry in queue_entries]
1093 self.queue_entry_ids = [entry.id for entry in queue_entries]
1094 else:
1095 assert host
1096 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001097
1098
jadmanski0afbb632008-06-06 21:10:57 +00001099 def poll(self):
showard08a36412009-05-05 01:01:13 +00001100 if not self.started:
1101 self.start()
showardd1195652009-12-08 22:21:02 +00001102 if not self.done:
1103 self.tick()
showard08a36412009-05-05 01:01:13 +00001104
1105
1106 def tick(self):
showardd1195652009-12-08 22:21:02 +00001107 assert self.monitor
1108 exit_code = self.monitor.exit_code()
1109 if exit_code is None:
1110 return
mbligh36768f02008-02-22 18:28:33 +00001111
showardd1195652009-12-08 22:21:02 +00001112 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001113 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001114
1115
jadmanski0afbb632008-06-06 21:10:57 +00001116 def is_done(self):
1117 return self.done
mbligh36768f02008-02-22 18:28:33 +00001118
1119
jadmanski0afbb632008-06-06 21:10:57 +00001120 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001121 if self.done:
showardd1195652009-12-08 22:21:02 +00001122 assert self.started
showard08a36412009-05-05 01:01:13 +00001123 return
showardd1195652009-12-08 22:21:02 +00001124 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001125 self.done = True
1126 self.success = success
1127 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001128
1129
jadmanski0afbb632008-06-06 21:10:57 +00001130 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001131 """
1132 To be overridden.
1133 """
showarded2afea2009-07-07 20:54:07 +00001134 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001135 self.register_necessary_pidfiles()
1136
1137
1138 def _log_file(self):
1139 if not self._log_file_name:
1140 return None
1141 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001142
mbligh36768f02008-02-22 18:28:33 +00001143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001145 log_file = self._log_file()
1146 if self.monitor and log_file:
1147 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001151 """
1152 To be overridden.
1153 """
jadmanski0afbb632008-06-06 21:10:57 +00001154 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001155 logging.info("%s finished with success=%s", type(self).__name__,
1156 self.success)
1157
mbligh36768f02008-02-22 18:28:33 +00001158
1159
jadmanski0afbb632008-06-06 21:10:57 +00001160 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001161 if not self.started:
1162 self.prolog()
1163 self.run()
1164
1165 self.started = True
1166
1167
1168 def abort(self):
1169 if self.monitor:
1170 self.monitor.kill()
1171 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001172 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001173 self.cleanup()
1174
1175
showarded2afea2009-07-07 20:54:07 +00001176 def _get_consistent_execution_path(self, execution_entries):
1177 first_execution_path = execution_entries[0].execution_path()
1178 for execution_entry in execution_entries[1:]:
1179 assert execution_entry.execution_path() == first_execution_path, (
1180 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1181 execution_entry,
1182 first_execution_path,
1183 execution_entries[0]))
1184 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001185
1186
showarded2afea2009-07-07 20:54:07 +00001187 def _copy_results(self, execution_entries, use_monitor=None):
1188 """
1189 @param execution_entries: list of objects with execution_path() method
1190 """
showard6d1c1432009-08-20 23:30:39 +00001191 if use_monitor is not None and not use_monitor.has_process():
1192 return
1193
showarded2afea2009-07-07 20:54:07 +00001194 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001195 if use_monitor is None:
1196 assert self.monitor
1197 use_monitor = self.monitor
1198 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001199 execution_path = self._get_consistent_execution_path(execution_entries)
1200 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001201 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001202
showarda1e74b32009-05-12 17:32:04 +00001203
1204 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001205 for queue_entry in queue_entries:
1206 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001207
1208
mbligh4608b002010-01-05 18:22:35 +00001209 def _archive_results(self, queue_entries):
1210 for queue_entry in queue_entries:
1211 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001212
1213
showardd1195652009-12-08 22:21:02 +00001214 def _command_line(self):
1215 """
1216 Return the command line to run. Must be overridden.
1217 """
1218 raise NotImplementedError
1219
1220
1221 @property
1222 def num_processes(self):
1223 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001224 Return the number of processes forked by this BaseAgentTask's process.
1225 It may only be approximate. To be overridden if necessary.
showardd1195652009-12-08 22:21:02 +00001226 """
1227 return 1
1228
1229
1230 def _paired_with_monitor(self):
1231 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001232 If this BaseAgentTask's process must run on the same machine as some
showardd1195652009-12-08 22:21:02 +00001233 previous process, this method should be overridden to return a
1234 PidfileRunMonitor for that process.
1235 """
1236 return self._NullMonitor()
1237
1238
1239 @property
1240 def owner_username(self):
1241 """
1242 Return login of user responsible for this task. May be None. Must be
1243 overridden.
1244 """
1245 raise NotImplementedError
1246
1247
1248 def _working_directory(self):
1249 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001250 Return the directory where this BaseAgentTask's process executes.
1251 Must be overridden.
showardd1195652009-12-08 22:21:02 +00001252 """
1253 raise NotImplementedError
1254
1255
1256 def _pidfile_name(self):
1257 """
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001258 Return the name of the pidfile this BaseAgentTask's process uses. To be
showardd1195652009-12-08 22:21:02 +00001259 overridden if necessary.
1260 """
jamesrenc44ae992010-02-19 00:12:54 +00001261 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001262
1263
1264 def _check_paired_results_exist(self):
1265 if not self._paired_with_monitor().has_process():
1266 email_manager.manager.enqueue_notify_email(
1267 'No paired results in task',
1268 'No paired results in task %s at %s'
1269 % (self, self._paired_with_monitor().pidfile_id))
1270 self.finished(False)
1271 return False
1272 return True
1273
1274
1275 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001276 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001277 self.monitor = PidfileRunMonitor()
1278
1279
1280 def run(self):
1281 if not self._check_paired_results_exist():
1282 return
1283
1284 self._create_monitor()
1285 self.monitor.run(
1286 self._command_line(), self._working_directory(),
1287 num_processes=self.num_processes,
1288 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1289 pidfile_name=self._pidfile_name(),
1290 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001291 username=self.owner_username,
1292 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1293
1294
1295 def get_drone_hostnames_allowed(self):
1296 if not models.DroneSet.drone_sets_enabled():
1297 return None
1298
1299 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1300 if not hqes:
1301 # Only special tasks could be missing host queue entries
1302 assert isinstance(self, SpecialAgentTask)
1303 return self._user_or_global_default_drone_set(
1304 self.task, self.task.requested_by)
1305
1306 job_ids = hqes.values_list('job', flat=True).distinct()
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001307 assert job_ids.count() == 1, ("BaseAgentTask's queue entries "
jamesren76fcf192010-04-21 20:39:50 +00001308 "span multiple jobs")
1309
1310 job = models.Job.objects.get(id=job_ids[0])
1311 drone_set = job.drone_set
1312 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001313 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001314
1315 return drone_set.get_drone_hostnames()
1316
1317
1318 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1319 """
1320 Returns the user's default drone set, if present.
1321
1322 Otherwise, returns the global default drone set.
1323 """
1324 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1325 if not user:
1326 logging.warn('%s had no owner; using default drone set',
1327 obj_with_owner)
1328 return default_hostnames
1329 if not user.drone_set:
1330 logging.warn('User %s has no default drone set, using global '
1331 'default', user.login)
1332 return default_hostnames
1333 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001334
1335
1336 def register_necessary_pidfiles(self):
1337 pidfile_id = _drone_manager.get_pidfile_id_from(
1338 self._working_directory(), self._pidfile_name())
1339 _drone_manager.register_pidfile(pidfile_id)
1340
1341 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1342 if paired_pidfile_id:
1343 _drone_manager.register_pidfile(paired_pidfile_id)
1344
1345
1346 def recover(self):
1347 if not self._check_paired_results_exist():
1348 return
1349
1350 self._create_monitor()
1351 self.monitor.attach_to_existing_process(
1352 self._working_directory(), pidfile_name=self._pidfile_name(),
1353 num_processes=self.num_processes)
1354 if not self.monitor.has_process():
1355 # no process to recover; wait to be started normally
1356 self.monitor = None
1357 return
1358
1359 self.started = True
1360 logging.info('Recovering process %s for %s at %s'
1361 % (self.monitor.get_process(), type(self).__name__,
1362 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001363
1364
mbligh4608b002010-01-05 18:22:35 +00001365 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1366 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001367 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001368 for entry in queue_entries:
1369 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001370 raise host_scheduler.SchedulerError(
1371 '%s attempting to start entry with invalid status %s: '
1372 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001373 invalid_host_status = (
1374 allowed_host_statuses is not None
1375 and entry.host.status not in allowed_host_statuses)
1376 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001377 raise host_scheduler.SchedulerError(
1378 '%s attempting to start on queue entry with invalid '
1379 'host status %s: %s'
1380 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001381
1382
Yu-Ju Hong52ce11d2012-08-01 17:55:48 -07001383SiteAgentTask = utils.import_site_class(
1384 __file__, 'autotest_lib.scheduler.site_monitor_db',
1385 'SiteAgentTask', BaseAgentTask)
1386
1387class AgentTask(SiteAgentTask):
1388 pass
1389
1390
showardd9205182009-04-27 20:09:55 +00001391class TaskWithJobKeyvals(object):
1392 """AgentTask mixin providing functionality to help with job keyval files."""
1393 _KEYVAL_FILE = 'keyval'
1394 def _format_keyval(self, key, value):
1395 return '%s=%s' % (key, value)
1396
1397
1398 def _keyval_path(self):
1399 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001400 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001401
1402
1403 def _write_keyval_after_job(self, field, value):
1404 assert self.monitor
1405 if not self.monitor.has_process():
1406 return
1407 _drone_manager.write_lines_to_file(
1408 self._keyval_path(), [self._format_keyval(field, value)],
1409 paired_with_process=self.monitor.get_process())
1410
1411
1412 def _job_queued_keyval(self, job):
1413 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1414
1415
1416 def _write_job_finished(self):
1417 self._write_keyval_after_job("job_finished", int(time.time()))
1418
1419
showarddb502762009-09-09 15:31:20 +00001420 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1421 keyval_contents = '\n'.join(self._format_keyval(key, value)
1422 for key, value in keyval_dict.iteritems())
1423 # always end with a newline to allow additional keyvals to be written
1424 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001425 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001426 keyval_contents,
1427 file_path=keyval_path)
1428
1429
1430 def _write_keyvals_before_job(self, keyval_dict):
1431 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1432
1433
1434 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001435 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001436 host.hostname)
1437 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001438 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001439 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1440 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1441
1442
showard8cc058f2009-09-08 16:26:33 +00001443class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001444 """
1445 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1446 """
1447
1448 TASK_TYPE = None
1449 host = None
1450 queue_entry = None
1451
showardd1195652009-12-08 22:21:02 +00001452 def __init__(self, task, extra_command_args):
1453 super(SpecialAgentTask, self).__init__()
1454
lmrb7c5d272010-04-16 06:34:04 +00001455 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001456
jamesrenc44ae992010-02-19 00:12:54 +00001457 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001458 self.queue_entry = None
1459 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001460 self.queue_entry = scheduler_models.HostQueueEntry(
1461 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001462
showarded2afea2009-07-07 20:54:07 +00001463 self.task = task
1464 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001465
1466
showard8cc058f2009-09-08 16:26:33 +00001467 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001468 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1469
1470
1471 def _command_line(self):
1472 return _autoserv_command_line(self.host.hostname,
1473 self._extra_command_args,
1474 queue_entry=self.queue_entry)
1475
1476
1477 def _working_directory(self):
1478 return self.task.execution_path()
1479
1480
1481 @property
1482 def owner_username(self):
1483 if self.task.requested_by:
1484 return self.task.requested_by.login
1485 return None
showard8cc058f2009-09-08 16:26:33 +00001486
1487
showarded2afea2009-07-07 20:54:07 +00001488 def prolog(self):
1489 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001490 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001491 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001492
1493
showardde634ee2009-01-30 01:44:24 +00001494 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001495 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001496
showard2fe3f1d2009-07-06 20:19:11 +00001497 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001498 return # don't fail metahost entries, they'll be reassigned
1499
showard2fe3f1d2009-07-06 20:19:11 +00001500 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001501 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001502 return # entry has been aborted
1503
showard2fe3f1d2009-07-06 20:19:11 +00001504 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001505 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001506 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001507 self._write_keyval_after_job(queued_key, queued_time)
1508 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001509
showard8cc058f2009-09-08 16:26:33 +00001510 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001511 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001512 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001513 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001514
showard8cc058f2009-09-08 16:26:33 +00001515 pidfile_id = _drone_manager.get_pidfile_id_from(
1516 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001517 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001518 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001519
1520 if self.queue_entry.job.parse_failed_repair:
1521 self._parse_results([self.queue_entry])
1522 else:
1523 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001524
1525
1526 def cleanup(self):
1527 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001528
1529 # We will consider an aborted task to be "Failed"
1530 self.task.finish(bool(self.success))
1531
showardf85a0b72009-10-07 20:48:45 +00001532 if self.monitor:
1533 if self.monitor.has_process():
1534 self._copy_results([self.task])
1535 if self.monitor.pidfile_id is not None:
1536 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001537
1538
1539class RepairTask(SpecialAgentTask):
1540 TASK_TYPE = models.SpecialTask.Task.REPAIR
1541
1542
showardd1195652009-12-08 22:21:02 +00001543 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001544 """\
1545 queue_entry: queue entry to mark failed if this repair fails.
1546 """
1547 protection = host_protections.Protection.get_string(
1548 task.host.protection)
1549 # normalize the protection name
1550 protection = host_protections.Protection.get_attr_name(protection)
1551
1552 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001553 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001554
1555 # *don't* include the queue entry in IDs -- if the queue entry is
1556 # aborted, we want to leave the repair task running
1557 self._set_ids(host=self.host)
1558
1559
1560 def prolog(self):
1561 super(RepairTask, self).prolog()
1562 logging.info("repair_task starting")
1563 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001564
1565
jadmanski0afbb632008-06-06 21:10:57 +00001566 def epilog(self):
1567 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001568
jadmanski0afbb632008-06-06 21:10:57 +00001569 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001570 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001571 else:
showard8cc058f2009-09-08 16:26:33 +00001572 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001573 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001574 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001575
1576
showarded2afea2009-07-07 20:54:07 +00001577class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001578 def _copy_to_results_repository(self):
1579 if not self.queue_entry or self.queue_entry.meta_host:
1580 return
1581
1582 self.queue_entry.set_execution_subdir()
1583 log_name = os.path.basename(self.task.execution_path())
1584 source = os.path.join(self.task.execution_path(), 'debug',
1585 'autoserv.DEBUG')
1586 destination = os.path.join(
1587 self.queue_entry.execution_path(), log_name)
1588
1589 self.monitor.try_copy_to_results_repository(
1590 source, destination_path=destination)
1591
1592
showard170873e2009-01-07 00:22:26 +00001593 def epilog(self):
1594 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001595
showard775300b2009-09-09 15:30:50 +00001596 if self.success:
1597 return
showard8fe93b52008-11-18 17:53:22 +00001598
showard775300b2009-09-09 15:30:50 +00001599 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001600
showard775300b2009-09-09 15:30:50 +00001601 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001602 # effectively ignore failure for these hosts
1603 self.success = True
showard775300b2009-09-09 15:30:50 +00001604 return
1605
1606 if self.queue_entry:
1607 self.queue_entry.requeue()
1608
1609 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001610 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001611 queue_entry__id=self.queue_entry.id):
1612 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1613 self._fail_queue_entry()
1614 return
1615
showard9bb960b2009-11-19 01:02:11 +00001616 queue_entry = models.HostQueueEntry.objects.get(
1617 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001618 else:
1619 queue_entry = None
1620
1621 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001622 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001623 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001624 queue_entry=queue_entry,
1625 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001626
showard8fe93b52008-11-18 17:53:22 +00001627
1628class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001629 TASK_TYPE = models.SpecialTask.Task.VERIFY
1630
1631
showardd1195652009-12-08 22:21:02 +00001632 def __init__(self, task):
1633 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001634 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001635
1636
jadmanski0afbb632008-06-06 21:10:57 +00001637 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001638 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001639
showardb18134f2009-03-20 20:52:18 +00001640 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001641 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001642 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1643 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001644
jamesren42318f72010-05-10 23:40:59 +00001645 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001646 # and there's no need to keep records of other requests.
1647 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001648 host__id=self.host.id,
1649 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001650 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001651 queued_verifies = queued_verifies.exclude(id=self.task.id)
1652 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001653
mbligh36768f02008-02-22 18:28:33 +00001654
jadmanski0afbb632008-06-06 21:10:57 +00001655 def epilog(self):
1656 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001657 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001658 if self.queue_entry:
1659 self.queue_entry.on_pending()
1660 else:
1661 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001662
1663
mbligh4608b002010-01-05 18:22:35 +00001664class CleanupTask(PreJobTask):
1665 # note this can also run post-job, but when it does, it's running standalone
1666 # against the host (not related to the job), so it's not considered a
1667 # PostJobTask
1668
1669 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1670
1671
1672 def __init__(self, task, recover_run_monitor=None):
1673 super(CleanupTask, self).__init__(task, ['--cleanup'])
1674 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1675
1676
1677 def prolog(self):
1678 super(CleanupTask, self).prolog()
1679 logging.info("starting cleanup task for host: %s", self.host.hostname)
1680 self.host.set_status(models.Host.Status.CLEANING)
1681 if self.queue_entry:
1682 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1683
1684
1685 def _finish_epilog(self):
1686 if not self.queue_entry or not self.success:
1687 return
1688
1689 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1690 should_run_verify = (
1691 self.queue_entry.job.run_verify
1692 and self.host.protection != do_not_verify_protection)
1693 if should_run_verify:
1694 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1695 models.SpecialTask.objects.create(
1696 host=models.Host.objects.get(id=self.host.id),
1697 queue_entry=entry,
1698 task=models.SpecialTask.Task.VERIFY)
1699 else:
1700 self.queue_entry.on_pending()
1701
1702
1703 def epilog(self):
1704 super(CleanupTask, self).epilog()
1705
1706 if self.success:
1707 self.host.update_field('dirty', 0)
1708 self.host.set_status(models.Host.Status.READY)
1709
1710 self._finish_epilog()
1711
1712
showarda9545c02009-12-18 22:44:26 +00001713class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1714 """
1715 Common functionality for QueueTask and HostlessQueueTask
1716 """
1717 def __init__(self, queue_entries):
1718 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001719 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001720 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001721
1722
showard73ec0442009-02-07 02:05:20 +00001723 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001724 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001725
1726
jamesrenc44ae992010-02-19 00:12:54 +00001727 def _write_control_file(self, execution_path):
1728 control_path = _drone_manager.attach_file_to_execution(
1729 execution_path, self.job.control_file)
1730 return control_path
1731
1732
showardd1195652009-12-08 22:21:02 +00001733 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001734 execution_path = self.queue_entries[0].execution_path()
1735 control_path = self._write_control_file(execution_path)
1736 hostnames = ','.join(entry.host.hostname
1737 for entry in self.queue_entries
1738 if not entry.is_hostless())
1739
1740 execution_tag = self.queue_entries[0].execution_tag()
1741 params = _autoserv_command_line(
1742 hostnames,
1743 ['-P', execution_tag, '-n',
1744 _drone_manager.absolute_path(control_path)],
1745 job=self.job, verbose=False)
1746
1747 if not self.job.is_server_job():
1748 params.append('-c')
1749
Dale Curtis30cb8eb2011-06-09 12:22:26 -07001750 if self.job.is_image_update_job():
1751 params += ['--image', self.job.update_image_path]
1752
jamesrenc44ae992010-02-19 00:12:54 +00001753 return params
showardd1195652009-12-08 22:21:02 +00001754
1755
1756 @property
1757 def num_processes(self):
1758 return len(self.queue_entries)
1759
1760
1761 @property
1762 def owner_username(self):
1763 return self.job.owner
1764
1765
1766 def _working_directory(self):
1767 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001768
1769
jadmanski0afbb632008-06-06 21:10:57 +00001770 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001771 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001772 keyval_dict = self.job.keyval_dict()
1773 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001774 group_name = self.queue_entries[0].get_group_name()
1775 if group_name:
1776 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001777 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001778 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001779 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001780 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001781
1782
showard35162b02009-03-03 02:17:30 +00001783 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001784 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001785 _drone_manager.write_lines_to_file(error_file_path,
1786 [_LOST_PROCESS_ERROR])
1787
1788
showardd3dc1992009-04-22 21:01:40 +00001789 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001790 if not self.monitor:
1791 return
1792
showardd9205182009-04-27 20:09:55 +00001793 self._write_job_finished()
1794
showard35162b02009-03-03 02:17:30 +00001795 if self.monitor.lost_process:
1796 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001797
jadmanskif7fa2cc2008-10-01 14:13:23 +00001798
showardcbd74612008-11-19 21:42:02 +00001799 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001800 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001801 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001802 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001803 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001804
1805
jadmanskif7fa2cc2008-10-01 14:13:23 +00001806 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001807 if not self.monitor or not self.monitor.has_process():
1808 return
1809
jadmanskif7fa2cc2008-10-01 14:13:23 +00001810 # build up sets of all the aborted_by and aborted_on values
1811 aborted_by, aborted_on = set(), set()
1812 for queue_entry in self.queue_entries:
1813 if queue_entry.aborted_by:
1814 aborted_by.add(queue_entry.aborted_by)
1815 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1816 aborted_on.add(t)
1817
1818 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001819 # TODO(showard): this conditional is now obsolete, we just need to leave
1820 # it in temporarily for backwards compatibility over upgrades. delete
1821 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001822 assert len(aborted_by) <= 1
1823 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001824 aborted_by_value = aborted_by.pop()
1825 aborted_on_value = max(aborted_on)
1826 else:
1827 aborted_by_value = 'autotest_system'
1828 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001829
showarda0382352009-02-11 23:36:43 +00001830 self._write_keyval_after_job("aborted_by", aborted_by_value)
1831 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001832
showardcbd74612008-11-19 21:42:02 +00001833 aborted_on_string = str(datetime.datetime.fromtimestamp(
1834 aborted_on_value))
1835 self._write_status_comment('Job aborted by %s on %s' %
1836 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001837
1838
jadmanski0afbb632008-06-06 21:10:57 +00001839 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001840 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001841 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001842 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001843
1844
jadmanski0afbb632008-06-06 21:10:57 +00001845 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001846 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001847 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001848
1849
1850class QueueTask(AbstractQueueTask):
1851 def __init__(self, queue_entries):
1852 super(QueueTask, self).__init__(queue_entries)
1853 self._set_ids(queue_entries=queue_entries)
1854
1855
1856 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001857 self._check_queue_entry_statuses(
1858 self.queue_entries,
1859 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1860 models.HostQueueEntry.Status.RUNNING),
1861 allowed_host_statuses=(models.Host.Status.PENDING,
1862 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001863
1864 super(QueueTask, self).prolog()
1865
1866 for queue_entry in self.queue_entries:
1867 self._write_host_keyvals(queue_entry.host)
1868 queue_entry.host.set_status(models.Host.Status.RUNNING)
1869 queue_entry.host.update_field('dirty', 1)
1870 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1871 # TODO(gps): Remove this if nothing needs it anymore.
1872 # A potential user is: tko/parser
1873 self.job.write_to_machines_file(self.queue_entries[0])
1874
1875
1876 def _finish_task(self):
1877 super(QueueTask, self)._finish_task()
1878
1879 for queue_entry in self.queue_entries:
1880 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001881 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001882
1883
mbligh4608b002010-01-05 18:22:35 +00001884class HostlessQueueTask(AbstractQueueTask):
1885 def __init__(self, queue_entry):
1886 super(HostlessQueueTask, self).__init__([queue_entry])
1887 self.queue_entry_ids = [queue_entry.id]
1888
1889
1890 def prolog(self):
1891 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1892 super(HostlessQueueTask, self).prolog()
1893
1894
mbligh4608b002010-01-05 18:22:35 +00001895 def _finish_task(self):
1896 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001897 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001898
1899
showardd3dc1992009-04-22 21:01:40 +00001900class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001901 def __init__(self, queue_entries, log_file_name):
1902 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001903
showardd1195652009-12-08 22:21:02 +00001904 self.queue_entries = queue_entries
1905
showardd3dc1992009-04-22 21:01:40 +00001906 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001907 self._autoserv_monitor.attach_to_existing_process(
1908 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001909
showardd1195652009-12-08 22:21:02 +00001910
1911 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001912 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001913 return 'true'
1914 return self._generate_command(
1915 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001916
1917
1918 def _generate_command(self, results_dir):
1919 raise NotImplementedError('Subclasses must override this')
1920
1921
showardd1195652009-12-08 22:21:02 +00001922 @property
1923 def owner_username(self):
1924 return self.queue_entries[0].job.owner
1925
1926
1927 def _working_directory(self):
1928 return self._get_consistent_execution_path(self.queue_entries)
1929
1930
1931 def _paired_with_monitor(self):
1932 return self._autoserv_monitor
1933
1934
showardd3dc1992009-04-22 21:01:40 +00001935 def _job_was_aborted(self):
1936 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001937 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001938 queue_entry.update_from_database()
1939 if was_aborted is None: # first queue entry
1940 was_aborted = bool(queue_entry.aborted)
1941 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001942 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1943 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001944 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001945 'Inconsistent abort state',
1946 'Queue entries have inconsistent abort state:\n' +
1947 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001948 # don't crash here, just assume true
1949 return True
1950 return was_aborted
1951
1952
showardd1195652009-12-08 22:21:02 +00001953 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00001954 if self._job_was_aborted():
1955 return models.HostQueueEntry.Status.ABORTED
1956
1957 # we'll use a PidfileRunMonitor to read the autoserv exit status
1958 if self._autoserv_monitor.exit_code() == 0:
1959 return models.HostQueueEntry.Status.COMPLETED
1960 return models.HostQueueEntry.Status.FAILED
1961
1962
showardd3dc1992009-04-22 21:01:40 +00001963 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00001964 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001965 queue_entry.set_status(status)
1966
1967
1968 def abort(self):
1969 # override AgentTask.abort() to avoid killing the process and ending
1970 # the task. post-job tasks continue when the job is aborted.
1971 pass
1972
1973
mbligh4608b002010-01-05 18:22:35 +00001974 def _pidfile_label(self):
1975 # '.autoserv_execute' -> 'autoserv'
1976 return self._pidfile_name()[1:-len('_execute')]
1977
1978
showard9bb960b2009-11-19 01:02:11 +00001979class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00001980 """
1981 Task responsible for
1982 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1983 * copying logs to the results repository
1984 * spawning CleanupTasks for hosts, if necessary
1985 * spawning a FinalReparseTask for the job
1986 """
showardd1195652009-12-08 22:21:02 +00001987 def __init__(self, queue_entries, recover_run_monitor=None):
1988 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00001989 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001990 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00001991 self._set_ids(queue_entries=queue_entries)
1992
1993
1994 def _generate_command(self, results_dir):
1995 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00001996 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00001997 return [_autoserv_path , '-p',
1998 '--pidfile-label=%s' % self._pidfile_label(),
1999 '--use-existing-results', '--collect-crashinfo',
2000 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00002001
2002
showardd1195652009-12-08 22:21:02 +00002003 @property
2004 def num_processes(self):
2005 return len(self.queue_entries)
2006
2007
2008 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002009 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00002010
2011
showardd3dc1992009-04-22 21:01:40 +00002012 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002013 self._check_queue_entry_statuses(
2014 self.queue_entries,
2015 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
2016 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002017
showardd3dc1992009-04-22 21:01:40 +00002018 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002019
2020
showardd3dc1992009-04-22 21:01:40 +00002021 def epilog(self):
2022 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002023 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002024 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002025
showard9bb960b2009-11-19 01:02:11 +00002026
2027 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002028 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002029 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002030 models.HostQueueEntry.Status.COMPLETED)
2031 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2032 else:
2033 final_success = False
2034 num_tests_failed = 0
2035
showard9bb960b2009-11-19 01:02:11 +00002036 reboot_after = self._job.reboot_after
2037 do_reboot = (
2038 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002039 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002040 or reboot_after == model_attributes.RebootAfter.ALWAYS
2041 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002042 and final_success and num_tests_failed == 0))
2043
showardd1195652009-12-08 22:21:02 +00002044 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002045 if do_reboot:
2046 # don't pass the queue entry to the CleanupTask. if the cleanup
2047 # fails, the job doesn't care -- it's over.
2048 models.SpecialTask.objects.create(
2049 host=models.Host.objects.get(id=queue_entry.host.id),
2050 task=models.SpecialTask.Task.CLEANUP,
2051 requested_by=self._job.owner_model())
2052 else:
2053 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002054
2055
showard0bbfc212009-04-29 21:06:13 +00002056 def run(self):
showard597bfd32009-05-08 18:22:50 +00002057 autoserv_exit_code = self._autoserv_monitor.exit_code()
2058 # only run if Autoserv exited due to some signal. if we have no exit
2059 # code, assume something bad (and signal-like) happened.
2060 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002061 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002062 else:
2063 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002064
2065
mbligh4608b002010-01-05 18:22:35 +00002066class SelfThrottledPostJobTask(PostJobTask):
2067 """
2068 Special AgentTask subclass that maintains its own global process limit.
2069 """
2070 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002071
2072
mbligh4608b002010-01-05 18:22:35 +00002073 @classmethod
2074 def _increment_running_processes(cls):
2075 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002076
mblighd5c95802008-03-05 00:33:46 +00002077
mbligh4608b002010-01-05 18:22:35 +00002078 @classmethod
2079 def _decrement_running_processes(cls):
2080 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002081
2082
mbligh4608b002010-01-05 18:22:35 +00002083 @classmethod
2084 def _max_processes(cls):
2085 raise NotImplementedError
2086
2087
2088 @classmethod
2089 def _can_run_new_process(cls):
2090 return cls._num_running_processes < cls._max_processes()
2091
2092
2093 def _process_started(self):
2094 return bool(self.monitor)
2095
2096
2097 def tick(self):
2098 # override tick to keep trying to start until the process count goes
2099 # down and we can, at which point we revert to default behavior
2100 if self._process_started():
2101 super(SelfThrottledPostJobTask, self).tick()
2102 else:
2103 self._try_starting_process()
2104
2105
2106 def run(self):
2107 # override run() to not actually run unless we can
2108 self._try_starting_process()
2109
2110
2111 def _try_starting_process(self):
2112 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002113 return
2114
mbligh4608b002010-01-05 18:22:35 +00002115 # actually run the command
2116 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002117 if self._process_started():
2118 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002119
mblighd5c95802008-03-05 00:33:46 +00002120
mbligh4608b002010-01-05 18:22:35 +00002121 def finished(self, success):
2122 super(SelfThrottledPostJobTask, self).finished(success)
2123 if self._process_started():
2124 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002125
showard21baa452008-10-21 00:08:39 +00002126
mbligh4608b002010-01-05 18:22:35 +00002127class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002128 def __init__(self, queue_entries):
2129 super(FinalReparseTask, self).__init__(queue_entries,
2130 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002131 # don't use _set_ids, since we don't want to set the host_ids
2132 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002133
2134
2135 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002136 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002137 results_dir]
2138
2139
2140 @property
2141 def num_processes(self):
2142 return 0 # don't include parser processes in accounting
2143
2144
2145 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002146 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002147
2148
showard97aed502008-11-04 02:01:24 +00002149 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002150 def _max_processes(cls):
2151 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002152
2153
2154 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002155 self._check_queue_entry_statuses(
2156 self.queue_entries,
2157 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002158
showard97aed502008-11-04 02:01:24 +00002159 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002160
2161
2162 def epilog(self):
2163 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002164 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002165
2166
mbligh4608b002010-01-05 18:22:35 +00002167class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002168 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2169
mbligh4608b002010-01-05 18:22:35 +00002170 def __init__(self, queue_entries):
2171 super(ArchiveResultsTask, self).__init__(queue_entries,
2172 log_file_name='.archiving.log')
2173 # don't use _set_ids, since we don't want to set the host_ids
2174 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002175
2176
mbligh4608b002010-01-05 18:22:35 +00002177 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002178 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002179
2180
mbligh4608b002010-01-05 18:22:35 +00002181 def _generate_command(self, results_dir):
2182 return [_autoserv_path , '-p',
2183 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002184 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002185 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2186 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002187
2188
mbligh4608b002010-01-05 18:22:35 +00002189 @classmethod
2190 def _max_processes(cls):
2191 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002192
2193
2194 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002195 self._check_queue_entry_statuses(
2196 self.queue_entries,
2197 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2198
2199 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002200
2201
mbligh4608b002010-01-05 18:22:35 +00002202 def epilog(self):
2203 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002204 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002205 failed_file = os.path.join(self._working_directory(),
2206 self._ARCHIVING_FAILED_FILE)
2207 paired_process = self._paired_with_monitor().get_process()
2208 _drone_manager.write_lines_to_file(
2209 failed_file, ['Archiving failed with exit code %s'
2210 % self.monitor.exit_code()],
2211 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002212 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002213
2214
mbligh36768f02008-02-22 18:28:33 +00002215if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002216 main()