blob: cd1f91e8f87013037c950b7746403299fbc46655 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard402934a2009-12-21 22:20:47 +00008import common
showardef519212009-05-08 02:29:53 +00009import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
Eric Li6f27d4f2010-09-29 10:55:17 -070010import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback, urllib
showardf13a9e22009-12-18 22:54:09 +000011import itertools, logging, weakref, gc
showard402934a2009-12-21 22:20:47 +000012
mbligh8bcd23a2009-02-03 19:14:06 +000013import MySQLdb
showard402934a2009-12-21 22:20:47 +000014
showard043c62a2009-06-10 19:48:57 +000015from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000016from autotest_lib.frontend import setup_django_environment
showard402934a2009-12-21 22:20:47 +000017
18import django.db
19
showard136e6dc2009-06-10 19:38:49 +000020from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000021from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000022from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000023from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
jamesrendd855242010-03-02 22:23:44 +000024from autotest_lib.frontend.afe import model_attributes
showard170873e2009-01-07 00:22:26 +000025from autotest_lib.scheduler import drone_manager, drones, email_manager
Dale Curtisaa513362011-03-01 17:27:44 -080026from autotest_lib.scheduler import gc_stats, host_scheduler, monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000027from autotest_lib.scheduler import status_server, scheduler_config
jamesrenc44ae992010-02-19 00:12:54 +000028from autotest_lib.scheduler import scheduler_models
showard549afad2009-08-20 23:33:36 +000029BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
30PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000031
mbligh36768f02008-02-22 18:28:33 +000032RESULTS_DIR = '.'
33AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000034DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000035AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
36
37if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000038 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000039AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
40AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
41
42if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000043 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000044
showard35162b02009-03-03 02:17:30 +000045# error message to leave in results dir when an autoserv process disappears
46# mysteriously
47_LOST_PROCESS_ERROR = """\
48Autoserv failed abnormally during execution for this job, probably due to a
49system error on the Autotest server. Full results may not be available. Sorry.
50"""
51
mbligh6f8bab42008-02-29 22:45:14 +000052_db = None
mbligh36768f02008-02-22 18:28:33 +000053_shutdown = False
showard170873e2009-01-07 00:22:26 +000054_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
mbligh4314a712008-02-29 22:44:30 +000055_testing_mode = False
jamesrenc44ae992010-02-19 00:12:54 +000056_drone_manager = None
mbligh36768f02008-02-22 18:28:33 +000057
Eric Lie0493a42010-11-15 13:05:43 -080058def _parser_path_default(install_dir):
59 return os.path.join(install_dir, 'tko', 'parse')
60_parser_path_func = utils.import_site_function(
61 __file__, 'autotest_lib.scheduler.site_monitor_db',
62 'parser_path', _parser_path_default)
63_parser_path = _parser_path_func(drones.AUTOTEST_INSTALL_DIR)
64
mbligh36768f02008-02-22 18:28:33 +000065
showardec6a3b92009-09-25 20:29:13 +000066def _get_pidfile_timeout_secs():
67 """@returns How long to wait for autoserv to write pidfile."""
68 pidfile_timeout_mins = global_config.global_config.get_config_value(
69 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
70 return pidfile_timeout_mins * 60
71
72
mbligh83c1e9e2009-05-01 23:10:41 +000073def _site_init_monitor_db_dummy():
74 return {}
75
76
jamesren76fcf192010-04-21 20:39:50 +000077def _verify_default_drone_set_exists():
78 if (models.DroneSet.drone_sets_enabled() and
79 not models.DroneSet.default_drone_set_name()):
Dale Curtisaa513362011-03-01 17:27:44 -080080 raise host_scheduler.SchedulerError(
81 'Drone sets are enabled, but no default is set')
jamesren76fcf192010-04-21 20:39:50 +000082
83
84def _sanity_check():
85 """Make sure the configs are consistent before starting the scheduler"""
86 _verify_default_drone_set_exists()
87
88
mbligh36768f02008-02-22 18:28:33 +000089def main():
showard27f33872009-04-07 18:20:53 +000090 try:
showard549afad2009-08-20 23:33:36 +000091 try:
92 main_without_exception_handling()
93 except SystemExit:
94 raise
95 except:
96 logging.exception('Exception escaping in monitor_db')
97 raise
98 finally:
99 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +0000100
101
102def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +0000103 setup_logging()
mbligh36768f02008-02-22 18:28:33 +0000104
showard136e6dc2009-06-10 19:38:49 +0000105 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +0000106 parser = optparse.OptionParser(usage)
107 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
108 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +0000109 parser.add_option('--test', help='Indicate that scheduler is under ' +
110 'test and should use dummy autoserv and no parsing',
111 action='store_true')
112 (options, args) = parser.parse_args()
113 if len(args) != 1:
114 parser.print_usage()
115 return
mbligh36768f02008-02-22 18:28:33 +0000116
showard5613c662009-06-08 23:30:33 +0000117 scheduler_enabled = global_config.global_config.get_config_value(
118 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
119
120 if not scheduler_enabled:
121 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
122 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000123 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000124 sys.exit(1)
125
jadmanski0afbb632008-06-06 21:10:57 +0000126 global RESULTS_DIR
127 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000128
mbligh83c1e9e2009-05-01 23:10:41 +0000129 site_init = utils.import_site_function(__file__,
130 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
131 _site_init_monitor_db_dummy)
132 site_init()
133
showardcca334f2009-03-12 20:38:34 +0000134 # Change the cwd while running to avoid issues incase we were launched from
135 # somewhere odd (such as a random NFS home directory of the person running
136 # sudo to launch us as the appropriate user).
137 os.chdir(RESULTS_DIR)
138
jamesrenc7d387e2010-08-10 21:48:30 +0000139 # This is helpful for debugging why stuff a scheduler launches is
140 # misbehaving.
141 logging.info('os.environ: %s', os.environ)
showardc85c21b2008-11-24 22:17:37 +0000142
jadmanski0afbb632008-06-06 21:10:57 +0000143 if options.test:
144 global _autoserv_path
145 _autoserv_path = 'autoserv_dummy'
146 global _testing_mode
147 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000148
jamesrenc44ae992010-02-19 00:12:54 +0000149 server = status_server.StatusServer()
showardd1ee1dd2009-01-07 21:33:08 +0000150 server.start()
151
jadmanski0afbb632008-06-06 21:10:57 +0000152 try:
jamesrenc44ae992010-02-19 00:12:54 +0000153 initialize()
showardc5afc462009-01-13 00:09:39 +0000154 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000155 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000156
Eric Lia82dc352011-02-23 13:15:52 -0800157 while not _shutdown and not server._shutdown_scheduler:
jadmanski0afbb632008-06-06 21:10:57 +0000158 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000159 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000160 except:
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.log_stacktrace(
162 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000163
showard170873e2009-01-07 00:22:26 +0000164 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000165 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000166 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000167 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000168
169
showard136e6dc2009-06-10 19:38:49 +0000170def setup_logging():
171 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
172 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
173 logging_manager.configure_logging(
174 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
175 logfile_name=log_name)
176
177
mbligh36768f02008-02-22 18:28:33 +0000178def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000179 global _shutdown
180 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000181 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000182
183
jamesrenc44ae992010-02-19 00:12:54 +0000184def initialize():
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
186 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000187
showard8de37132009-08-31 18:33:08 +0000188 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000189 logging.critical("monitor_db already running, aborting!")
190 sys.exit(1)
191 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000192
showardb1e51872008-10-07 11:08:18 +0000193 if _testing_mode:
194 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000195 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000196
jadmanski0afbb632008-06-06 21:10:57 +0000197 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
198 global _db
showard170873e2009-01-07 00:22:26 +0000199 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000200 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000201
showardfa8629c2008-11-04 16:51:23 +0000202 # ensure Django connection is in autocommit
203 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000204 # bypass the readonly connection
205 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000206
showardb18134f2009-03-20 20:52:18 +0000207 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000208 signal.signal(signal.SIGINT, handle_sigint)
209
jamesrenc44ae992010-02-19 00:12:54 +0000210 initialize_globals()
211 scheduler_models.initialize()
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
jamesrenc44ae992010-02-19 00:12:54 +0000223def initialize_globals():
224 global _drone_manager
225 _drone_manager = drone_manager.instance()
226
227
showarded2afea2009-07-07 20:54:07 +0000228def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
229 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000230 """
231 @returns The autoserv command line as a list of executable + parameters.
232
233 @param machines - string - A machine or comma separated list of machines
234 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000235 @param extra_args - list - Additional arguments to pass to autoserv.
236 @param job - Job object - If supplied, -u owner and -l name parameters
237 will be added.
238 @param queue_entry - A HostQueueEntry object - If supplied and no Job
239 object was supplied, this will be used to lookup the Job object.
240 """
showarda9545c02009-12-18 22:44:26 +0000241 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000242 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000243 if machines:
244 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000245 if job or queue_entry:
246 if not job:
247 job = queue_entry.job
248 autoserv_argv += ['-u', job.owner, '-l', job.name]
Paul Pendlebury5a8c6ad2011-02-01 07:20:17 -0800249 if job.is_image_update_job():
250 autoserv_argv += ['--image', job.update_image_path]
showarde9c69362009-06-30 01:58:03 +0000251 if verbose:
252 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000253 return autoserv_argv + extra_args
254
255
showard170873e2009-01-07 00:22:26 +0000256class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000257 def __init__(self):
258 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000259 self._last_clean_time = time.time()
Dale Curtisaa513362011-03-01 17:27:44 -0800260 self._host_scheduler = host_scheduler.HostScheduler(_db)
mblighf3294cc2009-04-08 21:17:38 +0000261 user_cleanup_time = scheduler_config.config.clean_interval
262 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
263 _db, user_cleanup_time)
264 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000265 self._host_agents = {}
266 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000267 self._tick_count = 0
268 self._last_garbage_stats_time = time.time()
269 self._seconds_between_garbage_stats = 60 * (
270 global_config.global_config.get_config_value(
271 scheduler_config.CONFIG_SECTION,
272 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000273
mbligh36768f02008-02-22 18:28:33 +0000274
showard915958d2009-04-22 21:00:58 +0000275 def initialize(self, recover_hosts=True):
276 self._periodic_cleanup.initialize()
277 self._24hr_upkeep.initialize()
278
jadmanski0afbb632008-06-06 21:10:57 +0000279 # always recover processes
280 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000281
jadmanski0afbb632008-06-06 21:10:57 +0000282 if recover_hosts:
283 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000284
jamesrenc44ae992010-02-19 00:12:54 +0000285 self._host_scheduler.recovery_on_startup()
286
mbligh36768f02008-02-22 18:28:33 +0000287
jadmanski0afbb632008-06-06 21:10:57 +0000288 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000289 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000290 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000291 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000292 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000293 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000294 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000295 self._schedule_running_host_queue_entries()
296 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000297 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000298 self._handle_agents()
jamesrene21bf412010-02-26 02:30:07 +0000299 self._host_scheduler.tick()
showard170873e2009-01-07 00:22:26 +0000300 _drone_manager.execute_actions()
301 email_manager.manager.send_queued_emails()
showard402934a2009-12-21 22:20:47 +0000302 django.db.reset_queries()
showardf13a9e22009-12-18 22:54:09 +0000303 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000304
showard97aed502008-11-04 02:01:24 +0000305
mblighf3294cc2009-04-08 21:17:38 +0000306 def _run_cleanup(self):
307 self._periodic_cleanup.run_cleanup_maybe()
308 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000309
mbligh36768f02008-02-22 18:28:33 +0000310
showardf13a9e22009-12-18 22:54:09 +0000311 def _garbage_collection(self):
312 threshold_time = time.time() - self._seconds_between_garbage_stats
313 if threshold_time < self._last_garbage_stats_time:
314 # Don't generate these reports very often.
315 return
316
317 self._last_garbage_stats_time = time.time()
318 # Force a full level 0 collection (because we can, it doesn't hurt
319 # at this interval).
320 gc.collect()
321 logging.info('Logging garbage collector stats on tick %d.',
322 self._tick_count)
323 gc_stats._log_garbage_collector_stats()
324
325
showard170873e2009-01-07 00:22:26 +0000326 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
327 for object_id in object_ids:
328 agent_dict.setdefault(object_id, set()).add(agent)
329
330
331 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
332 for object_id in object_ids:
333 assert object_id in agent_dict
334 agent_dict[object_id].remove(agent)
335
336
showardd1195652009-12-08 22:21:02 +0000337 def add_agent_task(self, agent_task):
338 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000339 self._agents.append(agent)
340 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000341 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
342 self._register_agent_for_ids(self._queue_entry_agents,
343 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000344
showard170873e2009-01-07 00:22:26 +0000345
346 def get_agents_for_entry(self, queue_entry):
347 """
348 Find agents corresponding to the specified queue_entry.
349 """
showardd3dc1992009-04-22 21:01:40 +0000350 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000351
352
353 def host_has_agent(self, host):
354 """
355 Determine if there is currently an Agent present using this host.
356 """
357 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000358
359
jadmanski0afbb632008-06-06 21:10:57 +0000360 def remove_agent(self, agent):
361 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000362 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
363 agent)
364 self._unregister_agent_for_ids(self._queue_entry_agents,
365 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000366
367
showard8cc058f2009-09-08 16:26:33 +0000368 def _host_has_scheduled_special_task(self, host):
369 return bool(models.SpecialTask.objects.filter(host__id=host.id,
370 is_active=False,
371 is_complete=False))
372
373
jadmanski0afbb632008-06-06 21:10:57 +0000374 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000375 agent_tasks = self._create_recovery_agent_tasks()
376 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000377 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000378 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000379 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000380 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000381 self._reverify_remaining_hosts()
382 # reinitialize drones after killing orphaned processes, since they can
383 # leave around files when they die
384 _drone_manager.execute_actions()
385 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000386
showard170873e2009-01-07 00:22:26 +0000387
showardd1195652009-12-08 22:21:02 +0000388 def _create_recovery_agent_tasks(self):
389 return (self._get_queue_entry_agent_tasks()
390 + self._get_special_task_agent_tasks(is_active=True))
391
392
393 def _get_queue_entry_agent_tasks(self):
394 # host queue entry statuses handled directly by AgentTasks (Verifying is
395 # handled through SpecialTasks, so is not listed here)
396 statuses = (models.HostQueueEntry.Status.STARTING,
397 models.HostQueueEntry.Status.RUNNING,
398 models.HostQueueEntry.Status.GATHERING,
mbligh4608b002010-01-05 18:22:35 +0000399 models.HostQueueEntry.Status.PARSING,
400 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000401 status_list = ','.join("'%s'" % status for status in statuses)
jamesrenc44ae992010-02-19 00:12:54 +0000402 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000403 where='status IN (%s)' % status_list)
404
405 agent_tasks = []
406 used_queue_entries = set()
407 for entry in queue_entries:
408 if self.get_agents_for_entry(entry):
409 # already being handled
410 continue
411 if entry in used_queue_entries:
412 # already picked up by a synchronous job
413 continue
414 agent_task = self._get_agent_task_for_queue_entry(entry)
415 agent_tasks.append(agent_task)
416 used_queue_entries.update(agent_task.queue_entries)
417 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000418
419
showardd1195652009-12-08 22:21:02 +0000420 def _get_special_task_agent_tasks(self, is_active=False):
421 special_tasks = models.SpecialTask.objects.filter(
422 is_active=is_active, is_complete=False)
423 return [self._get_agent_task_for_special_task(task)
424 for task in special_tasks]
425
426
427 def _get_agent_task_for_queue_entry(self, queue_entry):
428 """
429 Construct an AgentTask instance for the given active HostQueueEntry,
430 if one can currently run it.
431 @param queue_entry: a HostQueueEntry
432 @returns an AgentTask to run the queue entry
433 """
434 task_entries = queue_entry.job.get_group_entries(queue_entry)
435 self._check_for_duplicate_host_entries(task_entries)
436
437 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
438 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000439 if queue_entry.is_hostless():
440 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000441 return QueueTask(queue_entries=task_entries)
442 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
443 return GatherLogsTask(queue_entries=task_entries)
444 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
445 return FinalReparseTask(queue_entries=task_entries)
mbligh4608b002010-01-05 18:22:35 +0000446 if queue_entry.status == models.HostQueueEntry.Status.ARCHIVING:
447 return ArchiveResultsTask(queue_entries=task_entries)
showardd1195652009-12-08 22:21:02 +0000448
Dale Curtisaa513362011-03-01 17:27:44 -0800449 raise host_scheduler.SchedulerError(
450 '_get_agent_task_for_queue_entry got entry with '
451 'invalid status %s: %s' % (queue_entry.status, queue_entry))
showardd1195652009-12-08 22:21:02 +0000452
453
454 def _check_for_duplicate_host_entries(self, task_entries):
mbligh4608b002010-01-05 18:22:35 +0000455 non_host_statuses = (models.HostQueueEntry.Status.PARSING,
456 models.HostQueueEntry.Status.ARCHIVING)
showardd1195652009-12-08 22:21:02 +0000457 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000458 using_host = (task_entry.host is not None
mbligh4608b002010-01-05 18:22:35 +0000459 and task_entry.status not in non_host_statuses)
showarda9545c02009-12-18 22:44:26 +0000460 if using_host:
showardd1195652009-12-08 22:21:02 +0000461 self._assert_host_has_no_agent(task_entry)
462
463
464 def _assert_host_has_no_agent(self, entry):
465 """
466 @param entry: a HostQueueEntry or a SpecialTask
467 """
468 if self.host_has_agent(entry.host):
469 agent = tuple(self._host_agents.get(entry.host.id))[0]
Dale Curtisaa513362011-03-01 17:27:44 -0800470 raise host_scheduler.SchedulerError(
showardd1195652009-12-08 22:21:02 +0000471 'While scheduling %s, host %s already has a host agent %s'
472 % (entry, entry.host, agent.task))
473
474
475 def _get_agent_task_for_special_task(self, special_task):
476 """
477 Construct an AgentTask class to run the given SpecialTask and add it
478 to this dispatcher.
479 @param special_task: a models.SpecialTask instance
480 @returns an AgentTask to run this SpecialTask
481 """
482 self._assert_host_has_no_agent(special_task)
483
484 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
485 for agent_task_class in special_agent_task_classes:
486 if agent_task_class.TASK_TYPE == special_task.task:
487 return agent_task_class(task=special_task)
488
Dale Curtisaa513362011-03-01 17:27:44 -0800489 raise host_scheduler.SchedulerError(
490 'No AgentTask class for task', str(special_task))
showardd1195652009-12-08 22:21:02 +0000491
492
493 def _register_pidfiles(self, agent_tasks):
494 for agent_task in agent_tasks:
495 agent_task.register_necessary_pidfiles()
496
497
498 def _recover_tasks(self, agent_tasks):
499 orphans = _drone_manager.get_orphaned_autoserv_processes()
500
501 for agent_task in agent_tasks:
502 agent_task.recover()
503 if agent_task.monitor and agent_task.monitor.has_process():
504 orphans.discard(agent_task.monitor.get_process())
505 self.add_agent_task(agent_task)
506
507 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000508
509
showard8cc058f2009-09-08 16:26:33 +0000510 def _get_unassigned_entries(self, status):
jamesrenc44ae992010-02-19 00:12:54 +0000511 for entry in scheduler_models.HostQueueEntry.fetch(where="status = '%s'"
512 % status):
showard0db3d432009-10-12 20:29:15 +0000513 if entry.status == status and not self.get_agents_for_entry(entry):
514 # The status can change during iteration, e.g., if job.run()
515 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000516 yield entry
517
518
showard6878e8b2009-07-20 22:37:45 +0000519 def _check_for_remaining_orphan_processes(self, orphans):
520 if not orphans:
521 return
522 subject = 'Unrecovered orphan autoserv processes remain'
523 message = '\n'.join(str(process) for process in orphans)
524 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000525
526 die_on_orphans = global_config.global_config.get_config_value(
527 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
528
529 if die_on_orphans:
530 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000531
showard170873e2009-01-07 00:22:26 +0000532
showard8cc058f2009-09-08 16:26:33 +0000533 def _recover_pending_entries(self):
534 for entry in self._get_unassigned_entries(
535 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000536 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000537 entry.on_pending()
538
539
showardb8900452009-10-12 20:31:01 +0000540 def _check_for_unrecovered_verifying_entries(self):
jamesrenc44ae992010-02-19 00:12:54 +0000541 queue_entries = scheduler_models.HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000542 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
543 unrecovered_hqes = []
544 for queue_entry in queue_entries:
545 special_tasks = models.SpecialTask.objects.filter(
546 task__in=(models.SpecialTask.Task.CLEANUP,
547 models.SpecialTask.Task.VERIFY),
548 queue_entry__id=queue_entry.id,
549 is_complete=False)
550 if special_tasks.count() == 0:
551 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000552
showardb8900452009-10-12 20:31:01 +0000553 if unrecovered_hqes:
554 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
Dale Curtisaa513362011-03-01 17:27:44 -0800555 raise host_scheduler.SchedulerError(
showard37757f32009-10-19 18:34:24 +0000556 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000557 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000558
559
showard65db3932009-10-28 19:54:35 +0000560 def _get_prioritized_special_tasks(self):
561 """
562 Returns all queued SpecialTasks prioritized for repair first, then
563 cleanup, then verify.
564 """
565 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
566 is_complete=False,
567 host__locked=False)
568 # exclude hosts with active queue entries unless the SpecialTask is for
569 # that queue entry
showard7e67b432010-01-20 01:13:04 +0000570 queued_tasks = models.SpecialTask.objects.add_join(
showardeab66ce2009-12-23 00:03:56 +0000571 queued_tasks, 'afe_host_queue_entries', 'host_id',
572 join_condition='afe_host_queue_entries.active',
showard7e67b432010-01-20 01:13:04 +0000573 join_from_key='host_id', force_left_join=True)
showard65db3932009-10-28 19:54:35 +0000574 queued_tasks = queued_tasks.extra(
showardeab66ce2009-12-23 00:03:56 +0000575 where=['(afe_host_queue_entries.id IS NULL OR '
576 'afe_host_queue_entries.id = '
577 'afe_special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000578
showard65db3932009-10-28 19:54:35 +0000579 # reorder tasks by priority
580 task_priority_order = [models.SpecialTask.Task.REPAIR,
581 models.SpecialTask.Task.CLEANUP,
582 models.SpecialTask.Task.VERIFY]
583 def task_priority_key(task):
584 return task_priority_order.index(task.task)
585 return sorted(queued_tasks, key=task_priority_key)
586
587
showard65db3932009-10-28 19:54:35 +0000588 def _schedule_special_tasks(self):
589 """
590 Execute queued SpecialTasks that are ready to run on idle hosts.
591 """
592 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000593 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000594 continue
showardd1195652009-12-08 22:21:02 +0000595 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000596
597
showard170873e2009-01-07 00:22:26 +0000598 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000599 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000600 # should never happen
showarded2afea2009-07-07 20:54:07 +0000601 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000602 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000603 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000604 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000605 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000606
607
jadmanski0afbb632008-06-06 21:10:57 +0000608 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000609 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000610 full_where='locked = 0 AND invalid = 0 AND ' + where
jamesrenc44ae992010-02-19 00:12:54 +0000611 for host in scheduler_models.Host.fetch(where=full_where):
showard170873e2009-01-07 00:22:26 +0000612 if self.host_has_agent(host):
613 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000614 continue
showard8cc058f2009-09-08 16:26:33 +0000615 if self._host_has_scheduled_special_task(host):
616 # host will have a special task scheduled on the next cycle
617 continue
showard170873e2009-01-07 00:22:26 +0000618 if print_message:
showardb18134f2009-03-20 20:52:18 +0000619 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000620 models.SpecialTask.objects.create(
621 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000622 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000623
624
jadmanski0afbb632008-06-06 21:10:57 +0000625 def _recover_hosts(self):
626 # recover "Repair Failed" hosts
627 message = 'Reverifying dead host %s'
628 self._reverify_hosts_where("status = 'Repair Failed'",
629 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000630
631
showard04c82c52008-05-29 19:38:12 +0000632
showardb95b1bd2008-08-15 18:11:04 +0000633 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000634 # prioritize by job priority, then non-metahost over metahost, then FIFO
jamesrenc44ae992010-02-19 00:12:54 +0000635 return list(scheduler_models.HostQueueEntry.fetch(
showardeab66ce2009-12-23 00:03:56 +0000636 joins='INNER JOIN afe_jobs ON (job_id=afe_jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000637 where='NOT complete AND NOT active AND status="Queued"',
showardeab66ce2009-12-23 00:03:56 +0000638 order_by='afe_jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000639
640
showard89f84db2009-03-12 20:39:13 +0000641 def _refresh_pending_queue_entries(self):
642 """
643 Lookup the pending HostQueueEntries and call our HostScheduler
644 refresh() method given that list. Return the list.
645
646 @returns A list of pending HostQueueEntries sorted in priority order.
647 """
showard63a34772008-08-18 19:32:50 +0000648 queue_entries = self._get_pending_queue_entries()
649 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000650 return []
showardb95b1bd2008-08-15 18:11:04 +0000651
showard63a34772008-08-18 19:32:50 +0000652 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000653
showard89f84db2009-03-12 20:39:13 +0000654 return queue_entries
655
656
657 def _schedule_atomic_group(self, queue_entry):
658 """
659 Schedule the given queue_entry on an atomic group of hosts.
660
661 Returns immediately if there are insufficient available hosts.
662
663 Creates new HostQueueEntries based off of queue_entry for the
664 scheduled hosts and starts them all running.
665 """
666 # This is a virtual host queue entry representing an entire
667 # atomic group, find a group and schedule their hosts.
668 group_hosts = self._host_scheduler.find_eligible_atomic_group(
669 queue_entry)
670 if not group_hosts:
671 return
showardcbe6f942009-06-17 19:33:49 +0000672
673 logging.info('Expanding atomic group entry %s with hosts %s',
674 queue_entry,
675 ', '.join(host.hostname for host in group_hosts))
jamesren883492a2010-02-12 00:45:18 +0000676
showard89f84db2009-03-12 20:39:13 +0000677 for assigned_host in group_hosts[1:]:
678 # Create a new HQE for every additional assigned_host.
jamesrenc44ae992010-02-19 00:12:54 +0000679 new_hqe = scheduler_models.HostQueueEntry.clone(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000680 new_hqe.save()
jamesren883492a2010-02-12 00:45:18 +0000681 new_hqe.set_host(assigned_host)
682 self._run_queue_entry(new_hqe)
683
684 # The first assigned host uses the original HostQueueEntry
685 queue_entry.set_host(group_hosts[0])
686 self._run_queue_entry(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000687
688
showarda9545c02009-12-18 22:44:26 +0000689 def _schedule_hostless_job(self, queue_entry):
690 self.add_agent_task(HostlessQueueTask(queue_entry))
jamesren47bd7372010-03-13 00:58:17 +0000691 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showarda9545c02009-12-18 22:44:26 +0000692
693
showard89f84db2009-03-12 20:39:13 +0000694 def _schedule_new_jobs(self):
695 queue_entries = self._refresh_pending_queue_entries()
696 if not queue_entries:
697 return
698
showard63a34772008-08-18 19:32:50 +0000699 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +0000700 is_unassigned_atomic_group = (
701 queue_entry.atomic_group_id is not None
702 and queue_entry.host_id is None)
jamesren883492a2010-02-12 00:45:18 +0000703
704 if queue_entry.is_hostless():
showarda9545c02009-12-18 22:44:26 +0000705 self._schedule_hostless_job(queue_entry)
jamesren883492a2010-02-12 00:45:18 +0000706 elif is_unassigned_atomic_group:
707 self._schedule_atomic_group(queue_entry)
showarde55955f2009-10-07 20:48:58 +0000708 else:
jamesren883492a2010-02-12 00:45:18 +0000709 assigned_host = self._host_scheduler.schedule_entry(queue_entry)
showard65db3932009-10-28 19:54:35 +0000710 if assigned_host and not self.host_has_agent(assigned_host):
jamesren883492a2010-02-12 00:45:18 +0000711 assert assigned_host.id == queue_entry.host_id
712 self._run_queue_entry(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000713
714
showard8cc058f2009-09-08 16:26:33 +0000715 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +0000716 for agent_task in self._get_queue_entry_agent_tasks():
717 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +0000718
719
720 def _schedule_delay_tasks(self):
jamesrenc44ae992010-02-19 00:12:54 +0000721 for entry in scheduler_models.HostQueueEntry.fetch(
722 where='status = "%s"' % models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +0000723 task = entry.job.schedule_delayed_callback_task(entry)
724 if task:
showardd1195652009-12-08 22:21:02 +0000725 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +0000726
727
jamesren883492a2010-02-12 00:45:18 +0000728 def _run_queue_entry(self, queue_entry):
729 queue_entry.schedule_pre_job_tasks()
mblighd5c95802008-03-05 00:33:46 +0000730
731
jadmanski0afbb632008-06-06 21:10:57 +0000732 def _find_aborting(self):
jamesrene7c65cb2010-06-08 20:38:10 +0000733 jobs_to_stop = set()
jamesrenc44ae992010-02-19 00:12:54 +0000734 for entry in scheduler_models.HostQueueEntry.fetch(
735 where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +0000736 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +0000737 for agent in self.get_agents_for_entry(entry):
738 agent.abort()
739 entry.abort(self)
jamesrene7c65cb2010-06-08 20:38:10 +0000740 jobs_to_stop.add(entry.job)
741 for job in jobs_to_stop:
742 job.stop_if_necessary()
jadmanski0afbb632008-06-06 21:10:57 +0000743
744
showard324bf812009-01-20 23:23:38 +0000745 def _can_start_agent(self, agent, num_started_this_cycle,
746 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000747 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +0000748 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +0000749 return True
750 # don't allow any nonzero-process agents to run after we've reached a
751 # limit (this avoids starvation of many-process agents)
752 if have_reached_limit:
753 return False
754 # total process throttling
showard9bb960b2009-11-19 01:02:11 +0000755 max_runnable_processes = _drone_manager.max_runnable_processes(
jamesren76fcf192010-04-21 20:39:50 +0000756 agent.task.owner_username,
757 agent.task.get_drone_hostnames_allowed())
showardd1195652009-12-08 22:21:02 +0000758 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +0000759 return False
760 # if a single agent exceeds the per-cycle throttling, still allow it to
761 # run when it's the first agent in the cycle
762 if num_started_this_cycle == 0:
763 return True
764 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +0000765 if (num_started_this_cycle + agent.task.num_processes >
766 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000767 return False
768 return True
769
770
jadmanski0afbb632008-06-06 21:10:57 +0000771 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000772 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000773 have_reached_limit = False
774 # iterate over copy, so we can remove agents during iteration
775 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +0000776 if not agent.started:
showard324bf812009-01-20 23:23:38 +0000777 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000778 have_reached_limit):
779 have_reached_limit = True
780 continue
showardd1195652009-12-08 22:21:02 +0000781 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +0000782 agent.tick()
showard8cc058f2009-09-08 16:26:33 +0000783 if agent.is_done():
784 logging.info("agent finished")
785 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +0000786 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000787 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000788
789
showard29f7cd22009-04-29 21:16:24 +0000790 def _process_recurring_runs(self):
791 recurring_runs = models.RecurringRun.objects.filter(
792 start_date__lte=datetime.datetime.now())
793 for rrun in recurring_runs:
794 # Create job from template
795 job = rrun.job
796 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +0000797 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +0000798
799 host_objects = info['hosts']
800 one_time_hosts = info['one_time_hosts']
801 metahost_objects = info['meta_hosts']
802 dependencies = info['dependencies']
803 atomic_group = info['atomic_group']
804
805 for host in one_time_hosts or []:
806 this_host = models.Host.create_one_time_host(host.hostname)
807 host_objects.append(this_host)
808
809 try:
810 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +0000811 options=options,
showard29f7cd22009-04-29 21:16:24 +0000812 host_objects=host_objects,
813 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +0000814 atomic_group=atomic_group)
815
816 except Exception, ex:
817 logging.exception(ex)
818 #TODO send email
819
820 if rrun.loop_count == 1:
821 rrun.delete()
822 else:
823 if rrun.loop_count != 0: # if not infinite loop
824 # calculate new start_date
825 difference = datetime.timedelta(seconds=rrun.loop_period)
826 rrun.start_date = rrun.start_date + difference
827 rrun.loop_count -= 1
828 rrun.save()
829
830
showard170873e2009-01-07 00:22:26 +0000831class PidfileRunMonitor(object):
832 """
833 Client must call either run() to start a new process or
834 attach_to_existing_process().
835 """
mbligh36768f02008-02-22 18:28:33 +0000836
showard170873e2009-01-07 00:22:26 +0000837 class _PidfileException(Exception):
838 """
839 Raised when there's some unexpected behavior with the pid file, but only
840 used internally (never allowed to escape this class).
841 """
mbligh36768f02008-02-22 18:28:33 +0000842
843
showard170873e2009-01-07 00:22:26 +0000844 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000845 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000846 self._start_time = None
847 self.pidfile_id = None
848 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000849
850
showard170873e2009-01-07 00:22:26 +0000851 def _add_nice_command(self, command, nice_level):
852 if not nice_level:
853 return command
854 return ['nice', '-n', str(nice_level)] + command
855
856
857 def _set_start_time(self):
858 self._start_time = time.time()
859
860
showard418785b2009-11-23 20:19:59 +0000861 def run(self, command, working_directory, num_processes, nice_level=None,
862 log_file=None, pidfile_name=None, paired_with_pidfile=None,
jamesren76fcf192010-04-21 20:39:50 +0000863 username=None, drone_hostnames_allowed=None):
showard170873e2009-01-07 00:22:26 +0000864 assert command is not None
865 if nice_level is not None:
866 command = ['nice', '-n', str(nice_level)] + command
867 self._set_start_time()
868 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000869 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +0000870 num_processes=num_processes, log_file=log_file,
jamesren76fcf192010-04-21 20:39:50 +0000871 paired_with_pidfile=paired_with_pidfile, username=username,
872 drone_hostnames_allowed=drone_hostnames_allowed)
showard170873e2009-01-07 00:22:26 +0000873
874
showarded2afea2009-07-07 20:54:07 +0000875 def attach_to_existing_process(self, execution_path,
jamesrenc44ae992010-02-19 00:12:54 +0000876 pidfile_name=drone_manager.AUTOSERV_PID_FILE,
showardd1195652009-12-08 22:21:02 +0000877 num_processes=None):
showard170873e2009-01-07 00:22:26 +0000878 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000879 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000880 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +0000881 if num_processes is not None:
882 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +0000883
884
jadmanski0afbb632008-06-06 21:10:57 +0000885 def kill(self):
showard170873e2009-01-07 00:22:26 +0000886 if self.has_process():
887 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000888
mbligh36768f02008-02-22 18:28:33 +0000889
showard170873e2009-01-07 00:22:26 +0000890 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000891 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000892 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000893
894
showard170873e2009-01-07 00:22:26 +0000895 def get_process(self):
showard21baa452008-10-21 00:08:39 +0000896 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +0000897 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +0000898 return self._state.process
mblighbb421852008-03-11 22:36:16 +0000899
900
showard170873e2009-01-07 00:22:26 +0000901 def _read_pidfile(self, use_second_read=False):
902 assert self.pidfile_id is not None, (
903 'You must call run() or attach_to_existing_process()')
904 contents = _drone_manager.get_pidfile_contents(
905 self.pidfile_id, use_second_read=use_second_read)
906 if contents.is_invalid():
907 self._state = drone_manager.PidfileContents()
908 raise self._PidfileException(contents)
909 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +0000910
911
showard21baa452008-10-21 00:08:39 +0000912 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +0000913 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
914 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +0000915 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +0000916 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +0000917
918
919 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +0000920 if self.lost_process:
showard21baa452008-10-21 00:08:39 +0000921 return
mblighbb421852008-03-11 22:36:16 +0000922
showard21baa452008-10-21 00:08:39 +0000923 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +0000924
showard170873e2009-01-07 00:22:26 +0000925 if self._state.process is None:
926 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +0000927 return
mbligh90a549d2008-03-25 23:52:34 +0000928
showard21baa452008-10-21 00:08:39 +0000929 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000930 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +0000931 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +0000932 return
mbligh90a549d2008-03-25 23:52:34 +0000933
showard170873e2009-01-07 00:22:26 +0000934 # pid but no running process - maybe process *just* exited
935 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +0000936 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +0000937 # autoserv exited without writing an exit code
938 # to the pidfile
showard21baa452008-10-21 00:08:39 +0000939 self._handle_pidfile_error(
940 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +0000941
showard21baa452008-10-21 00:08:39 +0000942
943 def _get_pidfile_info(self):
944 """\
945 After completion, self._state will contain:
946 pid=None, exit_status=None if autoserv has not yet run
947 pid!=None, exit_status=None if autoserv is running
948 pid!=None, exit_status!=None if autoserv has completed
949 """
950 try:
951 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +0000952 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +0000953 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +0000954
955
showard170873e2009-01-07 00:22:26 +0000956 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +0000957 """\
958 Called when no pidfile is found or no pid is in the pidfile.
959 """
showard170873e2009-01-07 00:22:26 +0000960 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +0000961 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +0000962 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +0000963 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +0000964 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +0000965
966
showard35162b02009-03-03 02:17:30 +0000967 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +0000968 """\
969 Called when autoserv has exited without writing an exit status,
970 or we've timed out waiting for autoserv to write a pid to the
971 pidfile. In either case, we just return failure and the caller
972 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +0000973
showard170873e2009-01-07 00:22:26 +0000974 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +0000975 """
976 self.lost_process = True
showard170873e2009-01-07 00:22:26 +0000977 self._state.process = process
showard21baa452008-10-21 00:08:39 +0000978 self._state.exit_status = 1
979 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +0000980
981
jadmanski0afbb632008-06-06 21:10:57 +0000982 def exit_code(self):
showard21baa452008-10-21 00:08:39 +0000983 self._get_pidfile_info()
984 return self._state.exit_status
985
986
987 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +0000988 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +0000989 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +0000990 if self._state.num_tests_failed is None:
991 return -1
showard21baa452008-10-21 00:08:39 +0000992 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +0000993
994
showardcdaeae82009-08-31 18:32:48 +0000995 def try_copy_results_on_drone(self, **kwargs):
996 if self.has_process():
997 # copy results logs into the normal place for job results
998 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
999
1000
1001 def try_copy_to_results_repository(self, source, **kwargs):
1002 if self.has_process():
1003 _drone_manager.copy_to_results_repository(self.get_process(),
1004 source, **kwargs)
1005
1006
mbligh36768f02008-02-22 18:28:33 +00001007class Agent(object):
showard77182562009-06-10 00:16:05 +00001008 """
showard8cc058f2009-09-08 16:26:33 +00001009 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001010
1011 The following methods are required on all task objects:
1012 poll() - Called periodically to let the task check its status and
1013 update its internal state. If the task succeeded.
1014 is_done() - Returns True if the task is finished.
1015 abort() - Called when an abort has been requested. The task must
1016 set its aborted attribute to True if it actually aborted.
1017
1018 The following attributes are required on all task objects:
1019 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001020 success - bool, True if this task succeeded.
1021 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1022 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001023 """
1024
1025
showard418785b2009-11-23 20:19:59 +00001026 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001027 """
showard8cc058f2009-09-08 16:26:33 +00001028 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001029 """
showard8cc058f2009-09-08 16:26:33 +00001030 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001031
showard77182562009-06-10 00:16:05 +00001032 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001033 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001034
showard8cc058f2009-09-08 16:26:33 +00001035 self.queue_entry_ids = task.queue_entry_ids
1036 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001037
showard8cc058f2009-09-08 16:26:33 +00001038 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001039 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001040
1041
jadmanski0afbb632008-06-06 21:10:57 +00001042 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001043 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001044 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001045 self.task.poll()
1046 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001047 self.finished = True
showardec113162008-05-08 00:52:49 +00001048
1049
jadmanski0afbb632008-06-06 21:10:57 +00001050 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001051 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001052
1053
showardd3dc1992009-04-22 21:01:40 +00001054 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001055 if self.task:
1056 self.task.abort()
1057 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001058 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001059 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001060
showardd3dc1992009-04-22 21:01:40 +00001061
mbligh36768f02008-02-22 18:28:33 +00001062class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001063 class _NullMonitor(object):
1064 pidfile_id = None
1065
1066 def has_process(self):
1067 return True
1068
1069
1070 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001071 """
showardd1195652009-12-08 22:21:02 +00001072 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001073 """
jadmanski0afbb632008-06-06 21:10:57 +00001074 self.done = False
showardd1195652009-12-08 22:21:02 +00001075 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001076 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001077 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001078 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001079 self.queue_entry_ids = []
1080 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001081 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001082
1083
1084 def _set_ids(self, host=None, queue_entries=None):
1085 if queue_entries and queue_entries != [None]:
1086 self.host_ids = [entry.host.id for entry in queue_entries]
1087 self.queue_entry_ids = [entry.id for entry in queue_entries]
1088 else:
1089 assert host
1090 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001091
1092
jadmanski0afbb632008-06-06 21:10:57 +00001093 def poll(self):
showard08a36412009-05-05 01:01:13 +00001094 if not self.started:
1095 self.start()
showardd1195652009-12-08 22:21:02 +00001096 if not self.done:
1097 self.tick()
showard08a36412009-05-05 01:01:13 +00001098
1099
1100 def tick(self):
showardd1195652009-12-08 22:21:02 +00001101 assert self.monitor
1102 exit_code = self.monitor.exit_code()
1103 if exit_code is None:
1104 return
mbligh36768f02008-02-22 18:28:33 +00001105
showardd1195652009-12-08 22:21:02 +00001106 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001107 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001108
1109
jadmanski0afbb632008-06-06 21:10:57 +00001110 def is_done(self):
1111 return self.done
mbligh36768f02008-02-22 18:28:33 +00001112
1113
jadmanski0afbb632008-06-06 21:10:57 +00001114 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001115 if self.done:
showardd1195652009-12-08 22:21:02 +00001116 assert self.started
showard08a36412009-05-05 01:01:13 +00001117 return
showardd1195652009-12-08 22:21:02 +00001118 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001119 self.done = True
1120 self.success = success
1121 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001125 """
1126 To be overridden.
1127 """
showarded2afea2009-07-07 20:54:07 +00001128 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001129 self.register_necessary_pidfiles()
1130
1131
1132 def _log_file(self):
1133 if not self._log_file_name:
1134 return None
1135 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001136
mbligh36768f02008-02-22 18:28:33 +00001137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001139 log_file = self._log_file()
1140 if self.monitor and log_file:
1141 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001142
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001145 """
1146 To be overridden.
1147 """
jadmanski0afbb632008-06-06 21:10:57 +00001148 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001149 logging.info("%s finished with success=%s", type(self).__name__,
1150 self.success)
1151
mbligh36768f02008-02-22 18:28:33 +00001152
1153
jadmanski0afbb632008-06-06 21:10:57 +00001154 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001155 if not self.started:
1156 self.prolog()
1157 self.run()
1158
1159 self.started = True
1160
1161
1162 def abort(self):
1163 if self.monitor:
1164 self.monitor.kill()
1165 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001166 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001167 self.cleanup()
1168
1169
showarded2afea2009-07-07 20:54:07 +00001170 def _get_consistent_execution_path(self, execution_entries):
1171 first_execution_path = execution_entries[0].execution_path()
1172 for execution_entry in execution_entries[1:]:
1173 assert execution_entry.execution_path() == first_execution_path, (
1174 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1175 execution_entry,
1176 first_execution_path,
1177 execution_entries[0]))
1178 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001179
1180
showarded2afea2009-07-07 20:54:07 +00001181 def _copy_results(self, execution_entries, use_monitor=None):
1182 """
1183 @param execution_entries: list of objects with execution_path() method
1184 """
showard6d1c1432009-08-20 23:30:39 +00001185 if use_monitor is not None and not use_monitor.has_process():
1186 return
1187
showarded2afea2009-07-07 20:54:07 +00001188 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001189 if use_monitor is None:
1190 assert self.monitor
1191 use_monitor = self.monitor
1192 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001193 execution_path = self._get_consistent_execution_path(execution_entries)
1194 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001195 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001196
showarda1e74b32009-05-12 17:32:04 +00001197
1198 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001199 for queue_entry in queue_entries:
1200 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001201
1202
mbligh4608b002010-01-05 18:22:35 +00001203 def _archive_results(self, queue_entries):
1204 for queue_entry in queue_entries:
1205 queue_entry.set_status(models.HostQueueEntry.Status.ARCHIVING)
showarda1e74b32009-05-12 17:32:04 +00001206
1207
showardd1195652009-12-08 22:21:02 +00001208 def _command_line(self):
1209 """
1210 Return the command line to run. Must be overridden.
1211 """
1212 raise NotImplementedError
1213
1214
1215 @property
1216 def num_processes(self):
1217 """
1218 Return the number of processes forked by this AgentTask's process. It
1219 may only be approximate. To be overridden if necessary.
1220 """
1221 return 1
1222
1223
1224 def _paired_with_monitor(self):
1225 """
1226 If this AgentTask's process must run on the same machine as some
1227 previous process, this method should be overridden to return a
1228 PidfileRunMonitor for that process.
1229 """
1230 return self._NullMonitor()
1231
1232
1233 @property
1234 def owner_username(self):
1235 """
1236 Return login of user responsible for this task. May be None. Must be
1237 overridden.
1238 """
1239 raise NotImplementedError
1240
1241
1242 def _working_directory(self):
1243 """
1244 Return the directory where this AgentTask's process executes. Must be
1245 overridden.
1246 """
1247 raise NotImplementedError
1248
1249
1250 def _pidfile_name(self):
1251 """
1252 Return the name of the pidfile this AgentTask's process uses. To be
1253 overridden if necessary.
1254 """
jamesrenc44ae992010-02-19 00:12:54 +00001255 return drone_manager.AUTOSERV_PID_FILE
showardd1195652009-12-08 22:21:02 +00001256
1257
1258 def _check_paired_results_exist(self):
1259 if not self._paired_with_monitor().has_process():
1260 email_manager.manager.enqueue_notify_email(
1261 'No paired results in task',
1262 'No paired results in task %s at %s'
1263 % (self, self._paired_with_monitor().pidfile_id))
1264 self.finished(False)
1265 return False
1266 return True
1267
1268
1269 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001270 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001271 self.monitor = PidfileRunMonitor()
1272
1273
1274 def run(self):
1275 if not self._check_paired_results_exist():
1276 return
1277
1278 self._create_monitor()
1279 self.monitor.run(
1280 self._command_line(), self._working_directory(),
1281 num_processes=self.num_processes,
1282 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1283 pidfile_name=self._pidfile_name(),
1284 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
jamesren76fcf192010-04-21 20:39:50 +00001285 username=self.owner_username,
1286 drone_hostnames_allowed=self.get_drone_hostnames_allowed())
1287
1288
1289 def get_drone_hostnames_allowed(self):
1290 if not models.DroneSet.drone_sets_enabled():
1291 return None
1292
1293 hqes = models.HostQueueEntry.objects.filter(id__in=self.queue_entry_ids)
1294 if not hqes:
1295 # Only special tasks could be missing host queue entries
1296 assert isinstance(self, SpecialAgentTask)
1297 return self._user_or_global_default_drone_set(
1298 self.task, self.task.requested_by)
1299
1300 job_ids = hqes.values_list('job', flat=True).distinct()
1301 assert job_ids.count() == 1, ("AgentTask's queue entries "
1302 "span multiple jobs")
1303
1304 job = models.Job.objects.get(id=job_ids[0])
1305 drone_set = job.drone_set
1306 if not drone_set:
jamesrendd77e012010-04-28 18:07:30 +00001307 return self._user_or_global_default_drone_set(job, job.user())
jamesren76fcf192010-04-21 20:39:50 +00001308
1309 return drone_set.get_drone_hostnames()
1310
1311
1312 def _user_or_global_default_drone_set(self, obj_with_owner, user):
1313 """
1314 Returns the user's default drone set, if present.
1315
1316 Otherwise, returns the global default drone set.
1317 """
1318 default_hostnames = models.DroneSet.get_default().get_drone_hostnames()
1319 if not user:
1320 logging.warn('%s had no owner; using default drone set',
1321 obj_with_owner)
1322 return default_hostnames
1323 if not user.drone_set:
1324 logging.warn('User %s has no default drone set, using global '
1325 'default', user.login)
1326 return default_hostnames
1327 return user.drone_set.get_drone_hostnames()
showardd1195652009-12-08 22:21:02 +00001328
1329
1330 def register_necessary_pidfiles(self):
1331 pidfile_id = _drone_manager.get_pidfile_id_from(
1332 self._working_directory(), self._pidfile_name())
1333 _drone_manager.register_pidfile(pidfile_id)
1334
1335 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1336 if paired_pidfile_id:
1337 _drone_manager.register_pidfile(paired_pidfile_id)
1338
1339
1340 def recover(self):
1341 if not self._check_paired_results_exist():
1342 return
1343
1344 self._create_monitor()
1345 self.monitor.attach_to_existing_process(
1346 self._working_directory(), pidfile_name=self._pidfile_name(),
1347 num_processes=self.num_processes)
1348 if not self.monitor.has_process():
1349 # no process to recover; wait to be started normally
1350 self.monitor = None
1351 return
1352
1353 self.started = True
1354 logging.info('Recovering process %s for %s at %s'
1355 % (self.monitor.get_process(), type(self).__name__,
1356 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001357
1358
mbligh4608b002010-01-05 18:22:35 +00001359 def _check_queue_entry_statuses(self, queue_entries, allowed_hqe_statuses,
1360 allowed_host_statuses=None):
jamesrenb8f3f352010-06-10 00:44:06 +00001361 class_name = self.__class__.__name__
mbligh4608b002010-01-05 18:22:35 +00001362 for entry in queue_entries:
1363 if entry.status not in allowed_hqe_statuses:
Dale Curtisaa513362011-03-01 17:27:44 -08001364 raise host_scheduler.SchedulerError(
1365 '%s attempting to start entry with invalid status %s: '
1366 '%s' % (class_name, entry.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001367 invalid_host_status = (
1368 allowed_host_statuses is not None
1369 and entry.host.status not in allowed_host_statuses)
1370 if invalid_host_status:
Dale Curtisaa513362011-03-01 17:27:44 -08001371 raise host_scheduler.SchedulerError(
1372 '%s attempting to start on queue entry with invalid '
1373 'host status %s: %s'
1374 % (class_name, entry.host.status, entry))
mbligh4608b002010-01-05 18:22:35 +00001375
1376
showardd9205182009-04-27 20:09:55 +00001377class TaskWithJobKeyvals(object):
1378 """AgentTask mixin providing functionality to help with job keyval files."""
1379 _KEYVAL_FILE = 'keyval'
1380 def _format_keyval(self, key, value):
1381 return '%s=%s' % (key, value)
1382
1383
1384 def _keyval_path(self):
1385 """Subclasses must override this"""
lmrb7c5d272010-04-16 06:34:04 +00001386 raise NotImplementedError
showardd9205182009-04-27 20:09:55 +00001387
1388
1389 def _write_keyval_after_job(self, field, value):
1390 assert self.monitor
1391 if not self.monitor.has_process():
1392 return
1393 _drone_manager.write_lines_to_file(
1394 self._keyval_path(), [self._format_keyval(field, value)],
1395 paired_with_process=self.monitor.get_process())
1396
1397
1398 def _job_queued_keyval(self, job):
1399 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1400
1401
1402 def _write_job_finished(self):
1403 self._write_keyval_after_job("job_finished", int(time.time()))
1404
1405
showarddb502762009-09-09 15:31:20 +00001406 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1407 keyval_contents = '\n'.join(self._format_keyval(key, value)
1408 for key, value in keyval_dict.iteritems())
1409 # always end with a newline to allow additional keyvals to be written
1410 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001411 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001412 keyval_contents,
1413 file_path=keyval_path)
1414
1415
1416 def _write_keyvals_before_job(self, keyval_dict):
1417 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1418
1419
1420 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001421 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001422 host.hostname)
1423 platform, all_labels = host.platform_and_labels()
Eric Li6f27d4f2010-09-29 10:55:17 -07001424 all_labels = [ urllib.quote(label) for label in all_labels ]
showarddb502762009-09-09 15:31:20 +00001425 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1426 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1427
1428
showard8cc058f2009-09-08 16:26:33 +00001429class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001430 """
1431 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1432 """
1433
1434 TASK_TYPE = None
1435 host = None
1436 queue_entry = None
1437
showardd1195652009-12-08 22:21:02 +00001438 def __init__(self, task, extra_command_args):
1439 super(SpecialAgentTask, self).__init__()
1440
lmrb7c5d272010-04-16 06:34:04 +00001441 assert self.TASK_TYPE is not None, 'self.TASK_TYPE must be overridden'
showard8cc058f2009-09-08 16:26:33 +00001442
jamesrenc44ae992010-02-19 00:12:54 +00001443 self.host = scheduler_models.Host(id=task.host.id)
showard8cc058f2009-09-08 16:26:33 +00001444 self.queue_entry = None
1445 if task.queue_entry:
jamesrenc44ae992010-02-19 00:12:54 +00001446 self.queue_entry = scheduler_models.HostQueueEntry(
1447 id=task.queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00001448
showarded2afea2009-07-07 20:54:07 +00001449 self.task = task
1450 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001451
1452
showard8cc058f2009-09-08 16:26:33 +00001453 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001454 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1455
1456
1457 def _command_line(self):
1458 return _autoserv_command_line(self.host.hostname,
1459 self._extra_command_args,
1460 queue_entry=self.queue_entry)
1461
1462
1463 def _working_directory(self):
1464 return self.task.execution_path()
1465
1466
1467 @property
1468 def owner_username(self):
1469 if self.task.requested_by:
1470 return self.task.requested_by.login
1471 return None
showard8cc058f2009-09-08 16:26:33 +00001472
1473
showarded2afea2009-07-07 20:54:07 +00001474 def prolog(self):
1475 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001476 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001477 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001478
1479
showardde634ee2009-01-30 01:44:24 +00001480 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001481 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001482
showard2fe3f1d2009-07-06 20:19:11 +00001483 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001484 return # don't fail metahost entries, they'll be reassigned
1485
showard2fe3f1d2009-07-06 20:19:11 +00001486 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001487 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001488 return # entry has been aborted
1489
showard2fe3f1d2009-07-06 20:19:11 +00001490 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001491 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001492 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001493 self._write_keyval_after_job(queued_key, queued_time)
1494 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001495
showard8cc058f2009-09-08 16:26:33 +00001496 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001497 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001498 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001499 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001500
showard8cc058f2009-09-08 16:26:33 +00001501 pidfile_id = _drone_manager.get_pidfile_id_from(
1502 self.queue_entry.execution_path(),
jamesrenc44ae992010-02-19 00:12:54 +00001503 pidfile_name=drone_manager.AUTOSERV_PID_FILE)
showard8cc058f2009-09-08 16:26:33 +00001504 _drone_manager.register_pidfile(pidfile_id)
mbligh4608b002010-01-05 18:22:35 +00001505
1506 if self.queue_entry.job.parse_failed_repair:
1507 self._parse_results([self.queue_entry])
1508 else:
1509 self._archive_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001510
1511
1512 def cleanup(self):
1513 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001514
1515 # We will consider an aborted task to be "Failed"
1516 self.task.finish(bool(self.success))
1517
showardf85a0b72009-10-07 20:48:45 +00001518 if self.monitor:
1519 if self.monitor.has_process():
1520 self._copy_results([self.task])
1521 if self.monitor.pidfile_id is not None:
1522 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001523
1524
1525class RepairTask(SpecialAgentTask):
1526 TASK_TYPE = models.SpecialTask.Task.REPAIR
1527
1528
showardd1195652009-12-08 22:21:02 +00001529 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001530 """\
1531 queue_entry: queue entry to mark failed if this repair fails.
1532 """
1533 protection = host_protections.Protection.get_string(
1534 task.host.protection)
1535 # normalize the protection name
1536 protection = host_protections.Protection.get_attr_name(protection)
1537
1538 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001539 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001540
1541 # *don't* include the queue entry in IDs -- if the queue entry is
1542 # aborted, we want to leave the repair task running
1543 self._set_ids(host=self.host)
1544
1545
1546 def prolog(self):
1547 super(RepairTask, self).prolog()
1548 logging.info("repair_task starting")
1549 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001550
1551
jadmanski0afbb632008-06-06 21:10:57 +00001552 def epilog(self):
1553 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001554
jadmanski0afbb632008-06-06 21:10:57 +00001555 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001556 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001557 else:
showard8cc058f2009-09-08 16:26:33 +00001558 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001559 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001560 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001561
1562
showarded2afea2009-07-07 20:54:07 +00001563class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001564 def _copy_to_results_repository(self):
1565 if not self.queue_entry or self.queue_entry.meta_host:
1566 return
1567
1568 self.queue_entry.set_execution_subdir()
1569 log_name = os.path.basename(self.task.execution_path())
1570 source = os.path.join(self.task.execution_path(), 'debug',
1571 'autoserv.DEBUG')
1572 destination = os.path.join(
1573 self.queue_entry.execution_path(), log_name)
1574
1575 self.monitor.try_copy_to_results_repository(
1576 source, destination_path=destination)
1577
1578
showard170873e2009-01-07 00:22:26 +00001579 def epilog(self):
1580 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001581
showard775300b2009-09-09 15:30:50 +00001582 if self.success:
1583 return
showard8fe93b52008-11-18 17:53:22 +00001584
showard775300b2009-09-09 15:30:50 +00001585 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001586
showard775300b2009-09-09 15:30:50 +00001587 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001588 # effectively ignore failure for these hosts
1589 self.success = True
showard775300b2009-09-09 15:30:50 +00001590 return
1591
1592 if self.queue_entry:
1593 self.queue_entry.requeue()
1594
1595 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001596 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001597 queue_entry__id=self.queue_entry.id):
1598 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1599 self._fail_queue_entry()
1600 return
1601
showard9bb960b2009-11-19 01:02:11 +00001602 queue_entry = models.HostQueueEntry.objects.get(
1603 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001604 else:
1605 queue_entry = None
1606
1607 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001608 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001609 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001610 queue_entry=queue_entry,
1611 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001612
showard8fe93b52008-11-18 17:53:22 +00001613
1614class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001615 TASK_TYPE = models.SpecialTask.Task.VERIFY
1616
1617
showardd1195652009-12-08 22:21:02 +00001618 def __init__(self, task):
1619 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001620 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001621
1622
jadmanski0afbb632008-06-06 21:10:57 +00001623 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001624 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001625
showardb18134f2009-03-20 20:52:18 +00001626 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001627 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001628 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1629 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001630
jamesren42318f72010-05-10 23:40:59 +00001631 # Delete any queued manual reverifies for this host. One verify will do
showarded2afea2009-07-07 20:54:07 +00001632 # and there's no need to keep records of other requests.
1633 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001634 host__id=self.host.id,
1635 task=models.SpecialTask.Task.VERIFY,
jamesren42318f72010-05-10 23:40:59 +00001636 is_active=False, is_complete=False, queue_entry=None)
showarded2afea2009-07-07 20:54:07 +00001637 queued_verifies = queued_verifies.exclude(id=self.task.id)
1638 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001639
mbligh36768f02008-02-22 18:28:33 +00001640
jadmanski0afbb632008-06-06 21:10:57 +00001641 def epilog(self):
1642 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001643 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001644 if self.queue_entry:
1645 self.queue_entry.on_pending()
1646 else:
1647 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001648
1649
mbligh4608b002010-01-05 18:22:35 +00001650class CleanupTask(PreJobTask):
1651 # note this can also run post-job, but when it does, it's running standalone
1652 # against the host (not related to the job), so it's not considered a
1653 # PostJobTask
1654
1655 TASK_TYPE = models.SpecialTask.Task.CLEANUP
1656
1657
1658 def __init__(self, task, recover_run_monitor=None):
1659 super(CleanupTask, self).__init__(task, ['--cleanup'])
1660 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
1661
1662
1663 def prolog(self):
1664 super(CleanupTask, self).prolog()
1665 logging.info("starting cleanup task for host: %s", self.host.hostname)
1666 self.host.set_status(models.Host.Status.CLEANING)
1667 if self.queue_entry:
1668 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1669
1670
1671 def _finish_epilog(self):
1672 if not self.queue_entry or not self.success:
1673 return
1674
1675 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
1676 should_run_verify = (
1677 self.queue_entry.job.run_verify
1678 and self.host.protection != do_not_verify_protection)
1679 if should_run_verify:
1680 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
1681 models.SpecialTask.objects.create(
1682 host=models.Host.objects.get(id=self.host.id),
1683 queue_entry=entry,
1684 task=models.SpecialTask.Task.VERIFY)
1685 else:
1686 self.queue_entry.on_pending()
1687
1688
1689 def epilog(self):
1690 super(CleanupTask, self).epilog()
1691
1692 if self.success:
1693 self.host.update_field('dirty', 0)
1694 self.host.set_status(models.Host.Status.READY)
1695
1696 self._finish_epilog()
1697
1698
showarda9545c02009-12-18 22:44:26 +00001699class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1700 """
1701 Common functionality for QueueTask and HostlessQueueTask
1702 """
1703 def __init__(self, queue_entries):
1704 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001705 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001706 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001707
1708
showard73ec0442009-02-07 02:05:20 +00001709 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001710 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001711
1712
jamesrenc44ae992010-02-19 00:12:54 +00001713 def _write_control_file(self, execution_path):
1714 control_path = _drone_manager.attach_file_to_execution(
1715 execution_path, self.job.control_file)
1716 return control_path
1717
1718
showardd1195652009-12-08 22:21:02 +00001719 def _command_line(self):
jamesrenc44ae992010-02-19 00:12:54 +00001720 execution_path = self.queue_entries[0].execution_path()
1721 control_path = self._write_control_file(execution_path)
1722 hostnames = ','.join(entry.host.hostname
1723 for entry in self.queue_entries
1724 if not entry.is_hostless())
1725
1726 execution_tag = self.queue_entries[0].execution_tag()
1727 params = _autoserv_command_line(
1728 hostnames,
1729 ['-P', execution_tag, '-n',
1730 _drone_manager.absolute_path(control_path)],
1731 job=self.job, verbose=False)
1732
1733 if not self.job.is_server_job():
1734 params.append('-c')
1735
1736 return params
showardd1195652009-12-08 22:21:02 +00001737
1738
1739 @property
1740 def num_processes(self):
1741 return len(self.queue_entries)
1742
1743
1744 @property
1745 def owner_username(self):
1746 return self.job.owner
1747
1748
1749 def _working_directory(self):
1750 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00001751
1752
jadmanski0afbb632008-06-06 21:10:57 +00001753 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001754 queued_key, queued_time = self._job_queued_keyval(self.job)
showardc1a98d12010-01-15 00:22:22 +00001755 keyval_dict = self.job.keyval_dict()
1756 keyval_dict[queued_key] = queued_time
showardd1195652009-12-08 22:21:02 +00001757 group_name = self.queue_entries[0].get_group_name()
1758 if group_name:
1759 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00001760 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001761 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00001762 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00001763 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00001764
1765
showard35162b02009-03-03 02:17:30 +00001766 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00001767 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001768 _drone_manager.write_lines_to_file(error_file_path,
1769 [_LOST_PROCESS_ERROR])
1770
1771
showardd3dc1992009-04-22 21:01:40 +00001772 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001773 if not self.monitor:
1774 return
1775
showardd9205182009-04-27 20:09:55 +00001776 self._write_job_finished()
1777
showard35162b02009-03-03 02:17:30 +00001778 if self.monitor.lost_process:
1779 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001780
jadmanskif7fa2cc2008-10-01 14:13:23 +00001781
showardcbd74612008-11-19 21:42:02 +00001782 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001783 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00001784 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001785 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001786 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001787
1788
jadmanskif7fa2cc2008-10-01 14:13:23 +00001789 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001790 if not self.monitor or not self.monitor.has_process():
1791 return
1792
jadmanskif7fa2cc2008-10-01 14:13:23 +00001793 # build up sets of all the aborted_by and aborted_on values
1794 aborted_by, aborted_on = set(), set()
1795 for queue_entry in self.queue_entries:
1796 if queue_entry.aborted_by:
1797 aborted_by.add(queue_entry.aborted_by)
1798 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1799 aborted_on.add(t)
1800
1801 # extract some actual, unique aborted by value and write it out
showard64a95952010-01-13 21:27:16 +00001802 # TODO(showard): this conditional is now obsolete, we just need to leave
1803 # it in temporarily for backwards compatibility over upgrades. delete
1804 # soon.
jadmanskif7fa2cc2008-10-01 14:13:23 +00001805 assert len(aborted_by) <= 1
1806 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001807 aborted_by_value = aborted_by.pop()
1808 aborted_on_value = max(aborted_on)
1809 else:
1810 aborted_by_value = 'autotest_system'
1811 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001812
showarda0382352009-02-11 23:36:43 +00001813 self._write_keyval_after_job("aborted_by", aborted_by_value)
1814 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001815
showardcbd74612008-11-19 21:42:02 +00001816 aborted_on_string = str(datetime.datetime.fromtimestamp(
1817 aborted_on_value))
1818 self._write_status_comment('Job aborted by %s on %s' %
1819 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001820
1821
jadmanski0afbb632008-06-06 21:10:57 +00001822 def abort(self):
showarda9545c02009-12-18 22:44:26 +00001823 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001824 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001825 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001826
1827
jadmanski0afbb632008-06-06 21:10:57 +00001828 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00001829 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001830 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00001831
1832
1833class QueueTask(AbstractQueueTask):
1834 def __init__(self, queue_entries):
1835 super(QueueTask, self).__init__(queue_entries)
1836 self._set_ids(queue_entries=queue_entries)
1837
1838
1839 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001840 self._check_queue_entry_statuses(
1841 self.queue_entries,
1842 allowed_hqe_statuses=(models.HostQueueEntry.Status.STARTING,
1843 models.HostQueueEntry.Status.RUNNING),
1844 allowed_host_statuses=(models.Host.Status.PENDING,
1845 models.Host.Status.RUNNING))
showarda9545c02009-12-18 22:44:26 +00001846
1847 super(QueueTask, self).prolog()
1848
1849 for queue_entry in self.queue_entries:
1850 self._write_host_keyvals(queue_entry.host)
1851 queue_entry.host.set_status(models.Host.Status.RUNNING)
1852 queue_entry.host.update_field('dirty', 1)
1853 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1854 # TODO(gps): Remove this if nothing needs it anymore.
1855 # A potential user is: tko/parser
1856 self.job.write_to_machines_file(self.queue_entries[0])
1857
1858
1859 def _finish_task(self):
1860 super(QueueTask, self)._finish_task()
1861
1862 for queue_entry in self.queue_entries:
1863 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
jamesrenb8f3f352010-06-10 00:44:06 +00001864 queue_entry.host.set_status(models.Host.Status.RUNNING)
mbligh36768f02008-02-22 18:28:33 +00001865
1866
mbligh4608b002010-01-05 18:22:35 +00001867class HostlessQueueTask(AbstractQueueTask):
1868 def __init__(self, queue_entry):
1869 super(HostlessQueueTask, self).__init__([queue_entry])
1870 self.queue_entry_ids = [queue_entry.id]
1871
1872
1873 def prolog(self):
1874 self.queue_entries[0].update_field('execution_subdir', 'hostless')
1875 super(HostlessQueueTask, self).prolog()
1876
1877
mbligh4608b002010-01-05 18:22:35 +00001878 def _finish_task(self):
1879 super(HostlessQueueTask, self)._finish_task()
showardcc929362010-01-25 21:20:41 +00001880 self.queue_entries[0].set_status(models.HostQueueEntry.Status.PARSING)
mbligh4608b002010-01-05 18:22:35 +00001881
1882
showardd3dc1992009-04-22 21:01:40 +00001883class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00001884 def __init__(self, queue_entries, log_file_name):
1885 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00001886
showardd1195652009-12-08 22:21:02 +00001887 self.queue_entries = queue_entries
1888
showardd3dc1992009-04-22 21:01:40 +00001889 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00001890 self._autoserv_monitor.attach_to_existing_process(
1891 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00001892
showardd1195652009-12-08 22:21:02 +00001893
1894 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00001895 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00001896 return 'true'
1897 return self._generate_command(
1898 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00001899
1900
1901 def _generate_command(self, results_dir):
1902 raise NotImplementedError('Subclasses must override this')
1903
1904
showardd1195652009-12-08 22:21:02 +00001905 @property
1906 def owner_username(self):
1907 return self.queue_entries[0].job.owner
1908
1909
1910 def _working_directory(self):
1911 return self._get_consistent_execution_path(self.queue_entries)
1912
1913
1914 def _paired_with_monitor(self):
1915 return self._autoserv_monitor
1916
1917
showardd3dc1992009-04-22 21:01:40 +00001918 def _job_was_aborted(self):
1919 was_aborted = None
showardd1195652009-12-08 22:21:02 +00001920 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001921 queue_entry.update_from_database()
1922 if was_aborted is None: # first queue entry
1923 was_aborted = bool(queue_entry.aborted)
1924 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
jamesren17cadd62010-06-16 23:26:55 +00001925 entries = ['%s (aborted: %s)' % (entry, entry.aborted)
1926 for entry in self.queue_entries]
showardd3dc1992009-04-22 21:01:40 +00001927 email_manager.manager.enqueue_notify_email(
jamesren17cadd62010-06-16 23:26:55 +00001928 'Inconsistent abort state',
1929 'Queue entries have inconsistent abort state:\n' +
1930 '\n'.join(entries))
showardd3dc1992009-04-22 21:01:40 +00001931 # don't crash here, just assume true
1932 return True
1933 return was_aborted
1934
1935
showardd1195652009-12-08 22:21:02 +00001936 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00001937 if self._job_was_aborted():
1938 return models.HostQueueEntry.Status.ABORTED
1939
1940 # we'll use a PidfileRunMonitor to read the autoserv exit status
1941 if self._autoserv_monitor.exit_code() == 0:
1942 return models.HostQueueEntry.Status.COMPLETED
1943 return models.HostQueueEntry.Status.FAILED
1944
1945
showardd3dc1992009-04-22 21:01:40 +00001946 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00001947 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00001948 queue_entry.set_status(status)
1949
1950
1951 def abort(self):
1952 # override AgentTask.abort() to avoid killing the process and ending
1953 # the task. post-job tasks continue when the job is aborted.
1954 pass
1955
1956
mbligh4608b002010-01-05 18:22:35 +00001957 def _pidfile_label(self):
1958 # '.autoserv_execute' -> 'autoserv'
1959 return self._pidfile_name()[1:-len('_execute')]
1960
1961
showard9bb960b2009-11-19 01:02:11 +00001962class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00001963 """
1964 Task responsible for
1965 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1966 * copying logs to the results repository
1967 * spawning CleanupTasks for hosts, if necessary
1968 * spawning a FinalReparseTask for the job
1969 """
showardd1195652009-12-08 22:21:02 +00001970 def __init__(self, queue_entries, recover_run_monitor=None):
1971 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00001972 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001973 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00001974 self._set_ids(queue_entries=queue_entries)
1975
1976
1977 def _generate_command(self, results_dir):
1978 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00001979 for queue_entry in self.queue_entries)
mbligh4608b002010-01-05 18:22:35 +00001980 return [_autoserv_path , '-p',
1981 '--pidfile-label=%s' % self._pidfile_label(),
1982 '--use-existing-results', '--collect-crashinfo',
1983 '-m', host_list, '-r', results_dir]
showardd3dc1992009-04-22 21:01:40 +00001984
1985
showardd1195652009-12-08 22:21:02 +00001986 @property
1987 def num_processes(self):
1988 return len(self.queue_entries)
1989
1990
1991 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00001992 return drone_manager.CRASHINFO_PID_FILE
showardd1195652009-12-08 22:21:02 +00001993
1994
showardd3dc1992009-04-22 21:01:40 +00001995 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00001996 self._check_queue_entry_statuses(
1997 self.queue_entries,
1998 allowed_hqe_statuses=(models.HostQueueEntry.Status.GATHERING,),
1999 allowed_host_statuses=(models.Host.Status.RUNNING,))
showard8cc058f2009-09-08 16:26:33 +00002000
showardd3dc1992009-04-22 21:01:40 +00002001 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002002
2003
showardd3dc1992009-04-22 21:01:40 +00002004 def epilog(self):
2005 super(GatherLogsTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002006 self._parse_results(self.queue_entries)
showard9bb960b2009-11-19 01:02:11 +00002007 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002008
showard9bb960b2009-11-19 01:02:11 +00002009
2010 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002011 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002012 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002013 models.HostQueueEntry.Status.COMPLETED)
2014 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2015 else:
2016 final_success = False
2017 num_tests_failed = 0
2018
showard9bb960b2009-11-19 01:02:11 +00002019 reboot_after = self._job.reboot_after
2020 do_reboot = (
2021 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002022 self._final_status() == models.HostQueueEntry.Status.ABORTED
jamesrendd855242010-03-02 22:23:44 +00002023 or reboot_after == model_attributes.RebootAfter.ALWAYS
2024 or (reboot_after == model_attributes.RebootAfter.IF_ALL_TESTS_PASSED
showard9bb960b2009-11-19 01:02:11 +00002025 and final_success and num_tests_failed == 0))
2026
showardd1195652009-12-08 22:21:02 +00002027 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002028 if do_reboot:
2029 # don't pass the queue entry to the CleanupTask. if the cleanup
2030 # fails, the job doesn't care -- it's over.
2031 models.SpecialTask.objects.create(
2032 host=models.Host.objects.get(id=queue_entry.host.id),
2033 task=models.SpecialTask.Task.CLEANUP,
2034 requested_by=self._job.owner_model())
2035 else:
2036 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002037
2038
showard0bbfc212009-04-29 21:06:13 +00002039 def run(self):
showard597bfd32009-05-08 18:22:50 +00002040 autoserv_exit_code = self._autoserv_monitor.exit_code()
2041 # only run if Autoserv exited due to some signal. if we have no exit
2042 # code, assume something bad (and signal-like) happened.
2043 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002044 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002045 else:
2046 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002047
2048
mbligh4608b002010-01-05 18:22:35 +00002049class SelfThrottledPostJobTask(PostJobTask):
2050 """
2051 Special AgentTask subclass that maintains its own global process limit.
2052 """
2053 _num_running_processes = 0
showarded2afea2009-07-07 20:54:07 +00002054
2055
mbligh4608b002010-01-05 18:22:35 +00002056 @classmethod
2057 def _increment_running_processes(cls):
2058 cls._num_running_processes += 1
mbligh16c722d2008-03-05 00:58:44 +00002059
mblighd5c95802008-03-05 00:33:46 +00002060
mbligh4608b002010-01-05 18:22:35 +00002061 @classmethod
2062 def _decrement_running_processes(cls):
2063 cls._num_running_processes -= 1
showard8cc058f2009-09-08 16:26:33 +00002064
2065
mbligh4608b002010-01-05 18:22:35 +00002066 @classmethod
2067 def _max_processes(cls):
2068 raise NotImplementedError
2069
2070
2071 @classmethod
2072 def _can_run_new_process(cls):
2073 return cls._num_running_processes < cls._max_processes()
2074
2075
2076 def _process_started(self):
2077 return bool(self.monitor)
2078
2079
2080 def tick(self):
2081 # override tick to keep trying to start until the process count goes
2082 # down and we can, at which point we revert to default behavior
2083 if self._process_started():
2084 super(SelfThrottledPostJobTask, self).tick()
2085 else:
2086 self._try_starting_process()
2087
2088
2089 def run(self):
2090 # override run() to not actually run unless we can
2091 self._try_starting_process()
2092
2093
2094 def _try_starting_process(self):
2095 if not self._can_run_new_process():
showard775300b2009-09-09 15:30:50 +00002096 return
2097
mbligh4608b002010-01-05 18:22:35 +00002098 # actually run the command
2099 super(SelfThrottledPostJobTask, self).run()
jamesren25663562010-04-27 18:00:55 +00002100 if self._process_started():
2101 self._increment_running_processes()
mblighd5c95802008-03-05 00:33:46 +00002102
mblighd5c95802008-03-05 00:33:46 +00002103
mbligh4608b002010-01-05 18:22:35 +00002104 def finished(self, success):
2105 super(SelfThrottledPostJobTask, self).finished(success)
2106 if self._process_started():
2107 self._decrement_running_processes()
showard8cc058f2009-09-08 16:26:33 +00002108
showard21baa452008-10-21 00:08:39 +00002109
mbligh4608b002010-01-05 18:22:35 +00002110class FinalReparseTask(SelfThrottledPostJobTask):
showardd1195652009-12-08 22:21:02 +00002111 def __init__(self, queue_entries):
2112 super(FinalReparseTask, self).__init__(queue_entries,
2113 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002114 # don't use _set_ids, since we don't want to set the host_ids
2115 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002116
2117
2118 def _generate_command(self, results_dir):
mbligh4608b002010-01-05 18:22:35 +00002119 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd1195652009-12-08 22:21:02 +00002120 results_dir]
2121
2122
2123 @property
2124 def num_processes(self):
2125 return 0 # don't include parser processes in accounting
2126
2127
2128 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002129 return drone_manager.PARSER_PID_FILE
showardd1195652009-12-08 22:21:02 +00002130
2131
showard97aed502008-11-04 02:01:24 +00002132 @classmethod
mbligh4608b002010-01-05 18:22:35 +00002133 def _max_processes(cls):
2134 return scheduler_config.config.max_parse_processes
showard97aed502008-11-04 02:01:24 +00002135
2136
2137 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002138 self._check_queue_entry_statuses(
2139 self.queue_entries,
2140 allowed_hqe_statuses=(models.HostQueueEntry.Status.PARSING,))
showard8cc058f2009-09-08 16:26:33 +00002141
showard97aed502008-11-04 02:01:24 +00002142 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002143
2144
2145 def epilog(self):
2146 super(FinalReparseTask, self).epilog()
mbligh4608b002010-01-05 18:22:35 +00002147 self._archive_results(self.queue_entries)
showard97aed502008-11-04 02:01:24 +00002148
2149
mbligh4608b002010-01-05 18:22:35 +00002150class ArchiveResultsTask(SelfThrottledPostJobTask):
showarde1575b52010-01-15 00:21:12 +00002151 _ARCHIVING_FAILED_FILE = '.archiver_failed'
2152
mbligh4608b002010-01-05 18:22:35 +00002153 def __init__(self, queue_entries):
2154 super(ArchiveResultsTask, self).__init__(queue_entries,
2155 log_file_name='.archiving.log')
2156 # don't use _set_ids, since we don't want to set the host_ids
2157 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00002158
2159
mbligh4608b002010-01-05 18:22:35 +00002160 def _pidfile_name(self):
jamesrenc44ae992010-02-19 00:12:54 +00002161 return drone_manager.ARCHIVER_PID_FILE
showard97aed502008-11-04 02:01:24 +00002162
2163
mbligh4608b002010-01-05 18:22:35 +00002164 def _generate_command(self, results_dir):
2165 return [_autoserv_path , '-p',
2166 '--pidfile-label=%s' % self._pidfile_label(), '-r', results_dir,
mblighe0cbc912010-03-11 18:03:07 +00002167 '--use-existing-results', '--control-filename=control.archive',
showard948eb302010-01-15 00:16:20 +00002168 os.path.join(drones.AUTOTEST_INSTALL_DIR, 'scheduler',
2169 'archive_results.control.srv')]
showard97aed502008-11-04 02:01:24 +00002170
2171
mbligh4608b002010-01-05 18:22:35 +00002172 @classmethod
2173 def _max_processes(cls):
2174 return scheduler_config.config.max_transfer_processes
showarda9545c02009-12-18 22:44:26 +00002175
2176
2177 def prolog(self):
mbligh4608b002010-01-05 18:22:35 +00002178 self._check_queue_entry_statuses(
2179 self.queue_entries,
2180 allowed_hqe_statuses=(models.HostQueueEntry.Status.ARCHIVING,))
2181
2182 super(ArchiveResultsTask, self).prolog()
showarda9545c02009-12-18 22:44:26 +00002183
2184
mbligh4608b002010-01-05 18:22:35 +00002185 def epilog(self):
2186 super(ArchiveResultsTask, self).epilog()
showard4076c632010-01-15 20:28:49 +00002187 if not self.success and self._paired_with_monitor().has_process():
showarde1575b52010-01-15 00:21:12 +00002188 failed_file = os.path.join(self._working_directory(),
2189 self._ARCHIVING_FAILED_FILE)
2190 paired_process = self._paired_with_monitor().get_process()
2191 _drone_manager.write_lines_to_file(
2192 failed_file, ['Archiving failed with exit code %s'
2193 % self.monitor.exit_code()],
2194 paired_with_process=paired_process)
mbligh4608b002010-01-05 18:22:35 +00002195 self._set_all_statuses(self._final_status())
showarda9545c02009-12-18 22:44:26 +00002196
2197
mbligh36768f02008-02-22 18:28:33 +00002198if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002199 main()