blob: caf60238677533f779f2f892bbe0cefc78214750 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
showard549afad2009-08-20 23:33:36 +000023BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
24PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000025
mbligh36768f02008-02-22 18:28:33 +000026RESULTS_DIR = '.'
27AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000028DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000029AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
30
31if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000032 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000033AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
34AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
35
36if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000037 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000038
mbligh90a549d2008-03-25 23:52:34 +000039# how long to wait for autoserv to write a pidfile
40PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000041
showardd3dc1992009-04-22 21:01:40 +000042_AUTOSERV_PID_FILE = '.autoserv_execute'
43_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
44_PARSER_PID_FILE = '.parser_execute'
45
46_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
47 _PARSER_PID_FILE)
48
showard35162b02009-03-03 02:17:30 +000049# error message to leave in results dir when an autoserv process disappears
50# mysteriously
51_LOST_PROCESS_ERROR = """\
52Autoserv failed abnormally during execution for this job, probably due to a
53system error on the Autotest server. Full results may not be available. Sorry.
54"""
55
mbligh6f8bab42008-02-29 22:45:14 +000056_db = None
mbligh36768f02008-02-22 18:28:33 +000057_shutdown = False
showard170873e2009-01-07 00:22:26 +000058_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
59_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000060_testing_mode = False
showard542e8402008-09-19 20:16:18 +000061_base_url = None
showardc85c21b2008-11-24 22:17:37 +000062_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000063_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000064
65
mbligh83c1e9e2009-05-01 23:10:41 +000066def _site_init_monitor_db_dummy():
67 return {}
68
69
mbligh36768f02008-02-22 18:28:33 +000070def main():
showard27f33872009-04-07 18:20:53 +000071 try:
showard549afad2009-08-20 23:33:36 +000072 try:
73 main_without_exception_handling()
74 except SystemExit:
75 raise
76 except:
77 logging.exception('Exception escaping in monitor_db')
78 raise
79 finally:
80 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000081
82
83def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000084 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000085
showard136e6dc2009-06-10 19:38:49 +000086 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000087 parser = optparse.OptionParser(usage)
88 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
89 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000090 parser.add_option('--test', help='Indicate that scheduler is under ' +
91 'test and should use dummy autoserv and no parsing',
92 action='store_true')
93 (options, args) = parser.parse_args()
94 if len(args) != 1:
95 parser.print_usage()
96 return
mbligh36768f02008-02-22 18:28:33 +000097
showard5613c662009-06-08 23:30:33 +000098 scheduler_enabled = global_config.global_config.get_config_value(
99 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
100
101 if not scheduler_enabled:
102 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
103 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000104 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000105 sys.exit(1)
106
jadmanski0afbb632008-06-06 21:10:57 +0000107 global RESULTS_DIR
108 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000109
mbligh83c1e9e2009-05-01 23:10:41 +0000110 site_init = utils.import_site_function(__file__,
111 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
112 _site_init_monitor_db_dummy)
113 site_init()
114
showardcca334f2009-03-12 20:38:34 +0000115 # Change the cwd while running to avoid issues incase we were launched from
116 # somewhere odd (such as a random NFS home directory of the person running
117 # sudo to launch us as the appropriate user).
118 os.chdir(RESULTS_DIR)
119
jadmanski0afbb632008-06-06 21:10:57 +0000120 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000121 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
122 "notify_email_statuses",
123 default='')
showardc85c21b2008-11-24 22:17:37 +0000124 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000125 _notify_email_statuses = [status for status in
126 re.split(r'[\s,;:]', notify_statuses_list.lower())
127 if status]
showardc85c21b2008-11-24 22:17:37 +0000128
jadmanski0afbb632008-06-06 21:10:57 +0000129 if options.test:
130 global _autoserv_path
131 _autoserv_path = 'autoserv_dummy'
132 global _testing_mode
133 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000134
mbligh37eceaa2008-12-15 22:56:37 +0000135 # AUTOTEST_WEB.base_url is still a supported config option as some people
136 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000137 global _base_url
showard170873e2009-01-07 00:22:26 +0000138 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
139 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000140 if config_base_url:
141 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000142 else:
mbligh37eceaa2008-12-15 22:56:37 +0000143 # For the common case of everything running on a single server you
144 # can just set the hostname in a single place in the config file.
145 server_name = c.get_config_value('SERVER', 'hostname')
146 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000147 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000148 sys.exit(1)
149 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000150
showardc5afc462009-01-13 00:09:39 +0000151 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000152 server.start()
153
jadmanski0afbb632008-06-06 21:10:57 +0000154 try:
showard136e6dc2009-06-10 19:38:49 +0000155 init()
showardc5afc462009-01-13 00:09:39 +0000156 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000157 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000158
jadmanski0afbb632008-06-06 21:10:57 +0000159 while not _shutdown:
160 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000161 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000162 except:
showard170873e2009-01-07 00:22:26 +0000163 email_manager.manager.log_stacktrace(
164 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000165
showard170873e2009-01-07 00:22:26 +0000166 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000167 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000168 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000169 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000170
171
showard136e6dc2009-06-10 19:38:49 +0000172def setup_logging():
173 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
174 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
175 logging_manager.configure_logging(
176 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
177 logfile_name=log_name)
178
179
mbligh36768f02008-02-22 18:28:33 +0000180def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000181 global _shutdown
182 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000183 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000184
185
showard136e6dc2009-06-10 19:38:49 +0000186def init():
showardb18134f2009-03-20 20:52:18 +0000187 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
188 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000189
showard549afad2009-08-20 23:33:36 +0000190 if utils.process_is_alive(PID_FILE_PREFIX):
191 logging.critical("monitor_db already running, aborting!")
192 sys.exit(1)
193 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000194
showardb1e51872008-10-07 11:08:18 +0000195 if _testing_mode:
196 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000197 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000198
jadmanski0afbb632008-06-06 21:10:57 +0000199 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
200 global _db
showard170873e2009-01-07 00:22:26 +0000201 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000202 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000203
showardfa8629c2008-11-04 16:51:23 +0000204 # ensure Django connection is in autocommit
205 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000206 # bypass the readonly connection
207 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000208
showardb18134f2009-03-20 20:52:18 +0000209 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000210 signal.signal(signal.SIGINT, handle_sigint)
211
showardd1ee1dd2009-01-07 21:33:08 +0000212 drones = global_config.global_config.get_config_value(
213 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
214 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000215 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000216 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000217 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
218
showardb18134f2009-03-20 20:52:18 +0000219 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000220
221
showarded2afea2009-07-07 20:54:07 +0000222def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
223 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000224 """
225 @returns The autoserv command line as a list of executable + parameters.
226
227 @param machines - string - A machine or comma separated list of machines
228 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000229 @param extra_args - list - Additional arguments to pass to autoserv.
230 @param job - Job object - If supplied, -u owner and -l name parameters
231 will be added.
232 @param queue_entry - A HostQueueEntry object - If supplied and no Job
233 object was supplied, this will be used to lookup the Job object.
234 """
showard87ba02a2009-04-20 19:37:32 +0000235 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000236 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000237 if job or queue_entry:
238 if not job:
239 job = queue_entry.job
240 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000241 if verbose:
242 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000243 return autoserv_argv + extra_args
244
245
showard89f84db2009-03-12 20:39:13 +0000246class SchedulerError(Exception):
247 """Raised by HostScheduler when an inconsistent state occurs."""
248
249
showard63a34772008-08-18 19:32:50 +0000250class HostScheduler(object):
251 def _get_ready_hosts(self):
252 # avoid any host with a currently active queue entry against it
253 hosts = Host.fetch(
254 joins='LEFT JOIN host_queue_entries AS active_hqe '
255 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000256 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000257 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000258 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000259 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
260 return dict((host.id, host) for host in hosts)
261
262
263 @staticmethod
264 def _get_sql_id_list(id_list):
265 return ','.join(str(item_id) for item_id in id_list)
266
267
268 @classmethod
showard989f25d2008-10-01 11:38:11 +0000269 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000270 if not id_list:
271 return {}
showard63a34772008-08-18 19:32:50 +0000272 query %= cls._get_sql_id_list(id_list)
273 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000274 return cls._process_many2many_dict(rows, flip)
275
276
277 @staticmethod
278 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000279 result = {}
280 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000281 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000282 if flip:
283 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000284 result.setdefault(left_id, set()).add(right_id)
285 return result
286
287
288 @classmethod
289 def _get_job_acl_groups(cls, job_ids):
290 query = """
showardd9ac4452009-02-07 02:04:37 +0000291 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000292 FROM jobs
293 INNER JOIN users ON users.login = jobs.owner
294 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
295 WHERE jobs.id IN (%s)
296 """
297 return cls._get_many2many_dict(query, job_ids)
298
299
300 @classmethod
301 def _get_job_ineligible_hosts(cls, job_ids):
302 query = """
303 SELECT job_id, host_id
304 FROM ineligible_host_queues
305 WHERE job_id IN (%s)
306 """
307 return cls._get_many2many_dict(query, job_ids)
308
309
310 @classmethod
showard989f25d2008-10-01 11:38:11 +0000311 def _get_job_dependencies(cls, job_ids):
312 query = """
313 SELECT job_id, label_id
314 FROM jobs_dependency_labels
315 WHERE job_id IN (%s)
316 """
317 return cls._get_many2many_dict(query, job_ids)
318
319
320 @classmethod
showard63a34772008-08-18 19:32:50 +0000321 def _get_host_acls(cls, host_ids):
322 query = """
showardd9ac4452009-02-07 02:04:37 +0000323 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000324 FROM acl_groups_hosts
325 WHERE host_id IN (%s)
326 """
327 return cls._get_many2many_dict(query, host_ids)
328
329
330 @classmethod
331 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000332 if not host_ids:
333 return {}, {}
showard63a34772008-08-18 19:32:50 +0000334 query = """
335 SELECT label_id, host_id
336 FROM hosts_labels
337 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000338 """ % cls._get_sql_id_list(host_ids)
339 rows = _db.execute(query)
340 labels_to_hosts = cls._process_many2many_dict(rows)
341 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
342 return labels_to_hosts, hosts_to_labels
343
344
345 @classmethod
346 def _get_labels(cls):
347 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000348
349
350 def refresh(self, pending_queue_entries):
351 self._hosts_available = self._get_ready_hosts()
352
353 relevant_jobs = [queue_entry.job_id
354 for queue_entry in pending_queue_entries]
355 self._job_acls = self._get_job_acl_groups(relevant_jobs)
356 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000357 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000358
359 host_ids = self._hosts_available.keys()
360 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000361 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
362
363 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000364
365
366 def _is_acl_accessible(self, host_id, queue_entry):
367 job_acls = self._job_acls.get(queue_entry.job_id, set())
368 host_acls = self._host_acls.get(host_id, set())
369 return len(host_acls.intersection(job_acls)) > 0
370
371
showard989f25d2008-10-01 11:38:11 +0000372 def _check_job_dependencies(self, job_dependencies, host_labels):
373 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000374 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000375
376
377 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
378 queue_entry):
showardade14e22009-01-26 22:38:32 +0000379 if not queue_entry.meta_host:
380 # bypass only_if_needed labels when a specific host is selected
381 return True
382
showard989f25d2008-10-01 11:38:11 +0000383 for label_id in host_labels:
384 label = self._labels[label_id]
385 if not label.only_if_needed:
386 # we don't care about non-only_if_needed labels
387 continue
388 if queue_entry.meta_host == label_id:
389 # if the label was requested in a metahost it's OK
390 continue
391 if label_id not in job_dependencies:
392 return False
393 return True
394
395
showard89f84db2009-03-12 20:39:13 +0000396 def _check_atomic_group_labels(self, host_labels, queue_entry):
397 """
398 Determine if the given HostQueueEntry's atomic group settings are okay
399 to schedule on a host with the given labels.
400
showard6157c632009-07-06 20:19:31 +0000401 @param host_labels: A list of label ids that the host has.
402 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000403
404 @returns True if atomic group settings are okay, False otherwise.
405 """
showard6157c632009-07-06 20:19:31 +0000406 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000407 queue_entry.atomic_group_id)
408
409
showard6157c632009-07-06 20:19:31 +0000410 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000411 """
412 Return the atomic group label id for a host with the given set of
413 labels if any, or None otherwise. Raises an exception if more than
414 one atomic group are found in the set of labels.
415
showard6157c632009-07-06 20:19:31 +0000416 @param host_labels: A list of label ids that the host has.
417 @param queue_entry: The HostQueueEntry we're testing. Only used for
418 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000419
420 @returns The id of the atomic group found on a label in host_labels
421 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000422 """
showard6157c632009-07-06 20:19:31 +0000423 atomic_labels = [self._labels[label_id] for label_id in host_labels
424 if self._labels[label_id].atomic_group_id is not None]
425 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000426 if not atomic_ids:
427 return None
428 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000429 logging.error('More than one Atomic Group on HQE "%s" via: %r',
430 queue_entry, atomic_labels)
431 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000432
433
434 def _get_atomic_group_labels(self, atomic_group_id):
435 """
436 Lookup the label ids that an atomic_group is associated with.
437
438 @param atomic_group_id - The id of the AtomicGroup to look up.
439
440 @returns A generator yeilding Label ids for this atomic group.
441 """
442 return (id for id, label in self._labels.iteritems()
443 if label.atomic_group_id == atomic_group_id
444 and not label.invalid)
445
446
showard54c1ea92009-05-20 00:32:58 +0000447 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000448 """
449 @param group_hosts - A sequence of Host ids to test for usability
450 and eligibility against the Job associated with queue_entry.
451 @param queue_entry - The HostQueueEntry that these hosts are being
452 tested for eligibility against.
453
454 @returns A subset of group_hosts Host ids that are eligible for the
455 supplied queue_entry.
456 """
457 return set(host_id for host_id in group_hosts
458 if self._is_host_usable(host_id)
459 and self._is_host_eligible_for_job(host_id, queue_entry))
460
461
showard989f25d2008-10-01 11:38:11 +0000462 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000463 if self._is_host_invalid(host_id):
464 # if an invalid host is scheduled for a job, it's a one-time host
465 # and it therefore bypasses eligibility checks. note this can only
466 # happen for non-metahosts, because invalid hosts have their label
467 # relationships cleared.
468 return True
469
showard989f25d2008-10-01 11:38:11 +0000470 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
471 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000472
showard89f84db2009-03-12 20:39:13 +0000473 return (self._is_acl_accessible(host_id, queue_entry) and
474 self._check_job_dependencies(job_dependencies, host_labels) and
475 self._check_only_if_needed_labels(
476 job_dependencies, host_labels, queue_entry) and
477 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000478
479
showard2924b0a2009-06-18 23:16:15 +0000480 def _is_host_invalid(self, host_id):
481 host_object = self._hosts_available.get(host_id, None)
482 return host_object and host_object.invalid
483
484
showard63a34772008-08-18 19:32:50 +0000485 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000486 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000487 return None
488 return self._hosts_available.pop(queue_entry.host_id, None)
489
490
491 def _is_host_usable(self, host_id):
492 if host_id not in self._hosts_available:
493 # host was already used during this scheduling cycle
494 return False
495 if self._hosts_available[host_id].invalid:
496 # Invalid hosts cannot be used for metahosts. They're included in
497 # the original query because they can be used by non-metahosts.
498 return False
499 return True
500
501
502 def _schedule_metahost(self, queue_entry):
503 label_id = queue_entry.meta_host
504 hosts_in_label = self._label_hosts.get(label_id, set())
505 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
506 set())
507
508 # must iterate over a copy so we can mutate the original while iterating
509 for host_id in list(hosts_in_label):
510 if not self._is_host_usable(host_id):
511 hosts_in_label.remove(host_id)
512 continue
513 if host_id in ineligible_host_ids:
514 continue
showard989f25d2008-10-01 11:38:11 +0000515 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000516 continue
517
showard89f84db2009-03-12 20:39:13 +0000518 # Remove the host from our cached internal state before returning
519 # the host object.
showard63a34772008-08-18 19:32:50 +0000520 hosts_in_label.remove(host_id)
521 return self._hosts_available.pop(host_id)
522 return None
523
524
525 def find_eligible_host(self, queue_entry):
526 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000527 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000528 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000529 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000530 return self._schedule_metahost(queue_entry)
531
532
showard89f84db2009-03-12 20:39:13 +0000533 def find_eligible_atomic_group(self, queue_entry):
534 """
535 Given an atomic group host queue entry, locate an appropriate group
536 of hosts for the associated job to run on.
537
538 The caller is responsible for creating new HQEs for the additional
539 hosts returned in order to run the actual job on them.
540
541 @returns A list of Host instances in a ready state to satisfy this
542 atomic group scheduling. Hosts will all belong to the same
543 atomic group label as specified by the queue_entry.
544 An empty list will be returned if no suitable atomic
545 group could be found.
546
547 TODO(gps): what is responsible for kicking off any attempted repairs on
548 a group of hosts? not this function, but something needs to. We do
549 not communicate that reason for returning [] outside of here...
550 For now, we'll just be unschedulable if enough hosts within one group
551 enter Repair Failed state.
552 """
553 assert queue_entry.atomic_group_id is not None
554 job = queue_entry.job
555 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000556 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000557 if job.synch_count > atomic_group.max_number_of_machines:
558 # Such a Job and HostQueueEntry should never be possible to
559 # create using the frontend. Regardless, we can't process it.
560 # Abort it immediately and log an error on the scheduler.
561 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000562 logging.error(
563 'Error: job %d synch_count=%d > requested atomic_group %d '
564 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
565 job.id, job.synch_count, atomic_group.id,
566 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000567 return []
568 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
569 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
570 set())
571
572 # Look in each label associated with atomic_group until we find one with
573 # enough hosts to satisfy the job.
574 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
575 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
576 if queue_entry.meta_host is not None:
577 # If we have a metahost label, only allow its hosts.
578 group_hosts.intersection_update(hosts_in_label)
579 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000580 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000581 group_hosts, queue_entry)
582
583 # Job.synch_count is treated as "minimum synch count" when
584 # scheduling for an atomic group of hosts. The atomic group
585 # number of machines is the maximum to pick out of a single
586 # atomic group label for scheduling at one time.
587 min_hosts = job.synch_count
588 max_hosts = atomic_group.max_number_of_machines
589
showard54c1ea92009-05-20 00:32:58 +0000590 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000591 # Not enough eligible hosts in this atomic group label.
592 continue
593
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_hosts_in_group = [self._hosts_available[id]
595 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000596 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000597 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000598
showard89f84db2009-03-12 20:39:13 +0000599 # Limit ourselves to scheduling the atomic group size.
600 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000601 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000602
603 # Remove the selected hosts from our cached internal state
604 # of available hosts in order to return the Host objects.
605 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000606 for host in eligible_hosts_in_group:
607 hosts_in_label.discard(host.id)
608 self._hosts_available.pop(host.id)
609 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000610 return host_list
611
612 return []
613
614
showard170873e2009-01-07 00:22:26 +0000615class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000616 def __init__(self):
617 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000618 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000619 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000620 user_cleanup_time = scheduler_config.config.clean_interval
621 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
622 _db, user_cleanup_time)
623 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000624 self._host_agents = {}
625 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard915958d2009-04-22 21:00:58 +0000628 def initialize(self, recover_hosts=True):
629 self._periodic_cleanup.initialize()
630 self._24hr_upkeep.initialize()
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 # always recover processes
633 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000634
jadmanski0afbb632008-06-06 21:10:57 +0000635 if recover_hosts:
636 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def tick(self):
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000641 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000642 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000643 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000644 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000645 self._schedule_new_jobs()
646 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000647 _drone_manager.execute_actions()
648 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000649
showard97aed502008-11-04 02:01:24 +0000650
mblighf3294cc2009-04-08 21:17:38 +0000651 def _run_cleanup(self):
652 self._periodic_cleanup.run_cleanup_maybe()
653 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000654
mbligh36768f02008-02-22 18:28:33 +0000655
showard170873e2009-01-07 00:22:26 +0000656 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
657 for object_id in object_ids:
658 agent_dict.setdefault(object_id, set()).add(agent)
659
660
661 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
662 for object_id in object_ids:
663 assert object_id in agent_dict
664 agent_dict[object_id].remove(agent)
665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def add_agent(self, agent):
668 self._agents.append(agent)
669 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000670 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
671 self._register_agent_for_ids(self._queue_entry_agents,
672 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000673
showard170873e2009-01-07 00:22:26 +0000674
675 def get_agents_for_entry(self, queue_entry):
676 """
677 Find agents corresponding to the specified queue_entry.
678 """
showardd3dc1992009-04-22 21:01:40 +0000679 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000680
681
682 def host_has_agent(self, host):
683 """
684 Determine if there is currently an Agent present using this host.
685 """
686 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def remove_agent(self, agent):
690 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000691 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
692 agent)
693 self._unregister_agent_for_ids(self._queue_entry_agents,
694 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000695
696
jadmanski0afbb632008-06-06 21:10:57 +0000697 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000698 self._register_pidfiles()
699 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000700 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000701 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000702 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000703 self._reverify_remaining_hosts()
704 # reinitialize drones after killing orphaned processes, since they can
705 # leave around files when they die
706 _drone_manager.execute_actions()
707 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000708
showard170873e2009-01-07 00:22:26 +0000709
710 def _register_pidfiles(self):
711 # during recovery we may need to read pidfiles for both running and
712 # parsing entries
713 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000714 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000715 special_tasks = models.SpecialTask.objects.filter(is_active=True)
716 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000717 for pidfile_name in _ALL_PIDFILE_NAMES:
718 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000719 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000720 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000721
722
showarded2afea2009-07-07 20:54:07 +0000723 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
724 run_monitor = PidfileRunMonitor()
725 run_monitor.attach_to_existing_process(execution_path,
726 pidfile_name=pidfile_name)
727 if run_monitor.has_process():
728 orphans.discard(run_monitor.get_process())
729 return run_monitor, '(process %s)' % run_monitor.get_process()
730 return None, 'without process'
731
732
showardd3dc1992009-04-22 21:01:40 +0000733 def _recover_entries_with_status(self, status, orphans, pidfile_name,
734 recover_entries_fn):
735 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000736 for queue_entry in queue_entries:
737 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000738 # synchronous job we've already recovered
739 continue
showardd3dc1992009-04-22 21:01:40 +0000740 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000741 run_monitor, process_string = self._get_recovery_run_monitor(
742 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000743
showarded2afea2009-07-07 20:54:07 +0000744 logging.info('Recovering %s entry %s %s',status.lower(),
745 ', '.join(str(entry) for entry in queue_entries),
746 process_string)
showardd3dc1992009-04-22 21:01:40 +0000747 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000748
749
showard6878e8b2009-07-20 22:37:45 +0000750 def _check_for_remaining_orphan_processes(self, orphans):
751 if not orphans:
752 return
753 subject = 'Unrecovered orphan autoserv processes remain'
754 message = '\n'.join(str(process) for process in orphans)
755 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000756
757 die_on_orphans = global_config.global_config.get_config_value(
758 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
759
760 if die_on_orphans:
761 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000762
showard170873e2009-01-07 00:22:26 +0000763
showardd3dc1992009-04-22 21:01:40 +0000764 def _recover_running_entries(self, orphans):
765 def recover_entries(job, queue_entries, run_monitor):
766 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000767 queue_task = QueueTask(job=job, queue_entries=queue_entries,
768 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000769 self.add_agent(Agent(tasks=[queue_task],
770 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000771 else:
772 # we could do better, but this retains legacy behavior for now
773 for queue_entry in queue_entries:
774 logging.info('Requeuing running HQE %s since it has no '
775 'process' % queue_entry)
776 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000777
778 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000779 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000780 recover_entries)
781
782
783 def _recover_gathering_entries(self, orphans):
784 def recover_entries(job, queue_entries, run_monitor):
785 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000786 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000787 self.add_agent(Agent([gather_task]))
788
789 self._recover_entries_with_status(
790 models.HostQueueEntry.Status.GATHERING,
791 orphans, _CRASHINFO_PID_FILE, recover_entries)
792
793
794 def _recover_parsing_entries(self, orphans):
795 def recover_entries(job, queue_entries, run_monitor):
796 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000797 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000798 self.add_agent(Agent([reparse_task], num_processes=0))
799
800 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
801 orphans, _PARSER_PID_FILE,
802 recover_entries)
803
804
805 def _recover_all_recoverable_entries(self):
806 orphans = _drone_manager.get_orphaned_autoserv_processes()
807 self._recover_running_entries(orphans)
808 self._recover_gathering_entries(orphans)
809 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000810 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000811 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000812
showard97aed502008-11-04 02:01:24 +0000813
showarded2afea2009-07-07 20:54:07 +0000814 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000815 """\
816 Recovers all special tasks that have started running but have not
817 completed.
818 """
819
820 tasks = models.SpecialTask.objects.filter(is_active=True,
821 is_complete=False)
822 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000823 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000824 if self.host_has_agent(task.host):
825 raise SchedulerError(
826 "%s already has a host agent %s." % (
827 task, self._host_agents.get(host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000828
829 host = Host(id=task.host.id)
830 queue_entry = None
831 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000832 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000833
showarded2afea2009-07-07 20:54:07 +0000834 run_monitor, process_string = self._get_recovery_run_monitor(
835 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
836
837 logging.info('Recovering %s %s', task, process_string)
838 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000839
840
showarded2afea2009-07-07 20:54:07 +0000841 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000842 """\
843 Recovers a single special task.
844 """
845 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000846 agent_tasks = self._recover_verify(task, host, queue_entry,
847 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000848 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000849 agent_tasks = self._recover_repair(task, host, queue_entry,
850 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000851 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000852 agent_tasks = self._recover_cleanup(task, host, queue_entry,
853 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000854 else:
855 # Should never happen
856 logging.error(
857 "Special task id %d had invalid task %s", (task.id, task.task))
858
859 self.add_agent(Agent(agent_tasks))
860
861
showarded2afea2009-07-07 20:54:07 +0000862 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000863 """\
864 Recovers a verify task.
865 No associated queue entry: Verify host
866 With associated queue entry: Verify host, and run associated queue
867 entry
868 """
869 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000870 return [VerifyTask(host=host, task=task,
871 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000872 else:
showarded2afea2009-07-07 20:54:07 +0000873 return [VerifyTask(queue_entry=queue_entry, task=task,
874 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000875 SetEntryPendingTask(queue_entry=queue_entry)]
876
877
showarded2afea2009-07-07 20:54:07 +0000878 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000879 """\
880 Recovers a repair task.
881 Always repair host
882 """
showarded2afea2009-07-07 20:54:07 +0000883 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
884 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000885
886
showarded2afea2009-07-07 20:54:07 +0000887 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000888 """\
889 Recovers a cleanup task.
890 No associated queue entry: Clean host
891 With associated queue entry: Clean host, verify host if needed, and
892 run associated queue entry
893 """
894 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000895 return [CleanupTask(host=host, task=task,
896 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000897 else:
898 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000899 task=task,
900 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000901 if queue_entry.job.should_run_verify(queue_entry):
902 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
903 agent_tasks.append(
904 SetEntryPendingTask(queue_entry=queue_entry))
905 return agent_tasks
906
907
showard6af73ad2009-07-28 20:00:58 +0000908 def _requeue_starting_entries(self):
909 # temporary measure until we implement proper recovery of Starting HQEs
910 for entry in HostQueueEntry.fetch(where='status="Starting"'):
911 logging.info('Requeuing "Starting" queue entry %s' % entry)
912 assert not self.get_agents_for_entry(entry)
913 assert entry.host.status == models.Host.Status.PENDING
914 self._reverify_hosts_where('id = %s' % entry.host.id)
915 entry.requeue()
916
917
showard6878e8b2009-07-20 22:37:45 +0000918 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000919 queue_entries = HostQueueEntry.fetch(
showard70a294f2009-08-20 23:33:21 +0000920 where='active AND NOT complete AND status != "Pending"')
showardd3dc1992009-04-22 21:01:40 +0000921
showarde8e37072009-08-20 23:31:30 +0000922 unrecovered_active_hqes = [entry for entry in queue_entries
923 if not self.get_agents_for_entry(entry)]
924 if unrecovered_active_hqes:
925 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
926 raise SchedulerError(
927 '%d unrecovered active host queue entries:\n%s' %
928 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000929
930
showard1ff7b2e2009-05-15 23:17:18 +0000931 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000932 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000933 task=models.SpecialTask.Task.VERIFY, is_active=False,
934 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000935
showard2fe3f1d2009-07-06 20:19:11 +0000936 for task in tasks:
937 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
938 if host.locked or host.invalid or self.host_has_agent(host):
939 continue
showard6d7b2ff2009-06-10 00:16:47 +0000940
showard2fe3f1d2009-07-06 20:19:11 +0000941 logging.info('Force reverifying host %s', host.hostname)
942 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000943
944
showard170873e2009-01-07 00:22:26 +0000945 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000946 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000947 # should never happen
showarded2afea2009-07-07 20:54:07 +0000948 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000949 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000950 self._reverify_hosts_where(
951 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
952 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000953
954
jadmanski0afbb632008-06-06 21:10:57 +0000955 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000956 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000957 full_where='locked = 0 AND invalid = 0 AND ' + where
958 for host in Host.fetch(where=full_where):
959 if self.host_has_agent(host):
960 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000961 continue
showard170873e2009-01-07 00:22:26 +0000962 if print_message:
showardb18134f2009-03-20 20:52:18 +0000963 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000964 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000965 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000966
967
jadmanski0afbb632008-06-06 21:10:57 +0000968 def _recover_hosts(self):
969 # recover "Repair Failed" hosts
970 message = 'Reverifying dead host %s'
971 self._reverify_hosts_where("status = 'Repair Failed'",
972 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000973
974
showard04c82c52008-05-29 19:38:12 +0000975
showardb95b1bd2008-08-15 18:11:04 +0000976 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000977 # prioritize by job priority, then non-metahost over metahost, then FIFO
978 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000979 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000980 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000981 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000982
983
showard89f84db2009-03-12 20:39:13 +0000984 def _refresh_pending_queue_entries(self):
985 """
986 Lookup the pending HostQueueEntries and call our HostScheduler
987 refresh() method given that list. Return the list.
988
989 @returns A list of pending HostQueueEntries sorted in priority order.
990 """
showard63a34772008-08-18 19:32:50 +0000991 queue_entries = self._get_pending_queue_entries()
992 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000993 return []
showardb95b1bd2008-08-15 18:11:04 +0000994
showard63a34772008-08-18 19:32:50 +0000995 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000996
showard89f84db2009-03-12 20:39:13 +0000997 return queue_entries
998
999
1000 def _schedule_atomic_group(self, queue_entry):
1001 """
1002 Schedule the given queue_entry on an atomic group of hosts.
1003
1004 Returns immediately if there are insufficient available hosts.
1005
1006 Creates new HostQueueEntries based off of queue_entry for the
1007 scheduled hosts and starts them all running.
1008 """
1009 # This is a virtual host queue entry representing an entire
1010 # atomic group, find a group and schedule their hosts.
1011 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1012 queue_entry)
1013 if not group_hosts:
1014 return
showardcbe6f942009-06-17 19:33:49 +00001015
1016 logging.info('Expanding atomic group entry %s with hosts %s',
1017 queue_entry,
1018 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001019 # The first assigned host uses the original HostQueueEntry
1020 group_queue_entries = [queue_entry]
1021 for assigned_host in group_hosts[1:]:
1022 # Create a new HQE for every additional assigned_host.
1023 new_hqe = HostQueueEntry.clone(queue_entry)
1024 new_hqe.save()
1025 group_queue_entries.append(new_hqe)
1026 assert len(group_queue_entries) == len(group_hosts)
1027 for queue_entry, host in itertools.izip(group_queue_entries,
1028 group_hosts):
1029 self._run_queue_entry(queue_entry, host)
1030
1031
1032 def _schedule_new_jobs(self):
1033 queue_entries = self._refresh_pending_queue_entries()
1034 if not queue_entries:
1035 return
1036
showard63a34772008-08-18 19:32:50 +00001037 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001038 if (queue_entry.atomic_group_id is None or
1039 queue_entry.host_id is not None):
1040 assigned_host = self._host_scheduler.find_eligible_host(
1041 queue_entry)
1042 if assigned_host:
1043 self._run_queue_entry(queue_entry, assigned_host)
1044 else:
1045 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001046
1047
1048 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001049 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1050 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001051
1052
jadmanski0afbb632008-06-06 21:10:57 +00001053 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001054 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001055 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001056 for agent in self.get_agents_for_entry(entry):
1057 agent.abort()
1058 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001059
1060
showard324bf812009-01-20 23:23:38 +00001061 def _can_start_agent(self, agent, num_started_this_cycle,
1062 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001063 # always allow zero-process agents to run
1064 if agent.num_processes == 0:
1065 return True
1066 # don't allow any nonzero-process agents to run after we've reached a
1067 # limit (this avoids starvation of many-process agents)
1068 if have_reached_limit:
1069 return False
1070 # total process throttling
showard324bf812009-01-20 23:23:38 +00001071 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001072 return False
1073 # if a single agent exceeds the per-cycle throttling, still allow it to
1074 # run when it's the first agent in the cycle
1075 if num_started_this_cycle == 0:
1076 return True
1077 # per-cycle throttling
1078 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001079 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001080 return False
1081 return True
1082
1083
jadmanski0afbb632008-06-06 21:10:57 +00001084 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001085 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001086 have_reached_limit = False
1087 # iterate over copy, so we can remove agents during iteration
1088 for agent in list(self._agents):
1089 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001090 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001091 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001092 continue
1093 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001094 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001095 have_reached_limit):
1096 have_reached_limit = True
1097 continue
showard4c5374f2008-09-04 17:02:56 +00001098 num_started_this_cycle += agent.num_processes
1099 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001100 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001101 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001102
1103
showard29f7cd22009-04-29 21:16:24 +00001104 def _process_recurring_runs(self):
1105 recurring_runs = models.RecurringRun.objects.filter(
1106 start_date__lte=datetime.datetime.now())
1107 for rrun in recurring_runs:
1108 # Create job from template
1109 job = rrun.job
1110 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001111 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001112
1113 host_objects = info['hosts']
1114 one_time_hosts = info['one_time_hosts']
1115 metahost_objects = info['meta_hosts']
1116 dependencies = info['dependencies']
1117 atomic_group = info['atomic_group']
1118
1119 for host in one_time_hosts or []:
1120 this_host = models.Host.create_one_time_host(host.hostname)
1121 host_objects.append(this_host)
1122
1123 try:
1124 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001125 options=options,
showard29f7cd22009-04-29 21:16:24 +00001126 host_objects=host_objects,
1127 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001128 atomic_group=atomic_group)
1129
1130 except Exception, ex:
1131 logging.exception(ex)
1132 #TODO send email
1133
1134 if rrun.loop_count == 1:
1135 rrun.delete()
1136 else:
1137 if rrun.loop_count != 0: # if not infinite loop
1138 # calculate new start_date
1139 difference = datetime.timedelta(seconds=rrun.loop_period)
1140 rrun.start_date = rrun.start_date + difference
1141 rrun.loop_count -= 1
1142 rrun.save()
1143
1144
showard170873e2009-01-07 00:22:26 +00001145class PidfileRunMonitor(object):
1146 """
1147 Client must call either run() to start a new process or
1148 attach_to_existing_process().
1149 """
mbligh36768f02008-02-22 18:28:33 +00001150
showard170873e2009-01-07 00:22:26 +00001151 class _PidfileException(Exception):
1152 """
1153 Raised when there's some unexpected behavior with the pid file, but only
1154 used internally (never allowed to escape this class).
1155 """
mbligh36768f02008-02-22 18:28:33 +00001156
1157
showard170873e2009-01-07 00:22:26 +00001158 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001159 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001160 self._start_time = None
1161 self.pidfile_id = None
1162 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001163
1164
showard170873e2009-01-07 00:22:26 +00001165 def _add_nice_command(self, command, nice_level):
1166 if not nice_level:
1167 return command
1168 return ['nice', '-n', str(nice_level)] + command
1169
1170
1171 def _set_start_time(self):
1172 self._start_time = time.time()
1173
1174
1175 def run(self, command, working_directory, nice_level=None, log_file=None,
1176 pidfile_name=None, paired_with_pidfile=None):
1177 assert command is not None
1178 if nice_level is not None:
1179 command = ['nice', '-n', str(nice_level)] + command
1180 self._set_start_time()
1181 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001182 command, working_directory, pidfile_name=pidfile_name,
1183 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001184
1185
showarded2afea2009-07-07 20:54:07 +00001186 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001187 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001188 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001189 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001190 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001191 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001192
1193
jadmanski0afbb632008-06-06 21:10:57 +00001194 def kill(self):
showard170873e2009-01-07 00:22:26 +00001195 if self.has_process():
1196 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001197
mbligh36768f02008-02-22 18:28:33 +00001198
showard170873e2009-01-07 00:22:26 +00001199 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001200 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001201 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001202
1203
showard170873e2009-01-07 00:22:26 +00001204 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001205 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001206 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001207 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001208
1209
showard170873e2009-01-07 00:22:26 +00001210 def _read_pidfile(self, use_second_read=False):
1211 assert self.pidfile_id is not None, (
1212 'You must call run() or attach_to_existing_process()')
1213 contents = _drone_manager.get_pidfile_contents(
1214 self.pidfile_id, use_second_read=use_second_read)
1215 if contents.is_invalid():
1216 self._state = drone_manager.PidfileContents()
1217 raise self._PidfileException(contents)
1218 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001219
1220
showard21baa452008-10-21 00:08:39 +00001221 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001222 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1223 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001224 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001225 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001226
1227
1228 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001229 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001230 return
mblighbb421852008-03-11 22:36:16 +00001231
showard21baa452008-10-21 00:08:39 +00001232 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001233
showard170873e2009-01-07 00:22:26 +00001234 if self._state.process is None:
1235 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001236 return
mbligh90a549d2008-03-25 23:52:34 +00001237
showard21baa452008-10-21 00:08:39 +00001238 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001239 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001240 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001241 return
mbligh90a549d2008-03-25 23:52:34 +00001242
showard170873e2009-01-07 00:22:26 +00001243 # pid but no running process - maybe process *just* exited
1244 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001245 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001246 # autoserv exited without writing an exit code
1247 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001248 self._handle_pidfile_error(
1249 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001250
showard21baa452008-10-21 00:08:39 +00001251
1252 def _get_pidfile_info(self):
1253 """\
1254 After completion, self._state will contain:
1255 pid=None, exit_status=None if autoserv has not yet run
1256 pid!=None, exit_status=None if autoserv is running
1257 pid!=None, exit_status!=None if autoserv has completed
1258 """
1259 try:
1260 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001261 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001262 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001263
1264
showard170873e2009-01-07 00:22:26 +00001265 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001266 """\
1267 Called when no pidfile is found or no pid is in the pidfile.
1268 """
showard170873e2009-01-07 00:22:26 +00001269 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001270 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1271 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001272 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001273 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001274
1275
showard35162b02009-03-03 02:17:30 +00001276 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001277 """\
1278 Called when autoserv has exited without writing an exit status,
1279 or we've timed out waiting for autoserv to write a pid to the
1280 pidfile. In either case, we just return failure and the caller
1281 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001282
showard170873e2009-01-07 00:22:26 +00001283 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001284 """
1285 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001286 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001287 self._state.exit_status = 1
1288 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001289
1290
jadmanski0afbb632008-06-06 21:10:57 +00001291 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001292 self._get_pidfile_info()
1293 return self._state.exit_status
1294
1295
1296 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001297 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001298 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001299 if self._state.num_tests_failed is None:
1300 return -1
showard21baa452008-10-21 00:08:39 +00001301 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001302
1303
showardcdaeae82009-08-31 18:32:48 +00001304 def try_copy_results_on_drone(self, **kwargs):
1305 if self.has_process():
1306 # copy results logs into the normal place for job results
1307 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1308
1309
1310 def try_copy_to_results_repository(self, source, **kwargs):
1311 if self.has_process():
1312 _drone_manager.copy_to_results_repository(self.get_process(),
1313 source, **kwargs)
1314
1315
mbligh36768f02008-02-22 18:28:33 +00001316class Agent(object):
showard77182562009-06-10 00:16:05 +00001317 """
1318 An agent for use by the Dispatcher class to perform a sequence of tasks.
1319
1320 The following methods are required on all task objects:
1321 poll() - Called periodically to let the task check its status and
1322 update its internal state. If the task succeeded.
1323 is_done() - Returns True if the task is finished.
1324 abort() - Called when an abort has been requested. The task must
1325 set its aborted attribute to True if it actually aborted.
1326
1327 The following attributes are required on all task objects:
1328 aborted - bool, True if this task was aborted.
1329 failure_tasks - A sequence of tasks to be run using a new Agent
1330 by the dispatcher should this task fail.
1331 success - bool, True if this task succeeded.
1332 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1333 host_ids - A sequence of Host ids this task represents.
1334
1335 The following attribute is written to all task objects:
1336 agent - A reference to the Agent instance that the task has been
1337 added to.
1338 """
1339
1340
showard170873e2009-01-07 00:22:26 +00001341 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001342 """
1343 @param tasks: A list of tasks as described in the class docstring.
1344 @param num_processes: The number of subprocesses the Agent represents.
1345 This is used by the Dispatcher for managing the load on the
1346 system. Defaults to 1.
1347 """
jadmanski0afbb632008-06-06 21:10:57 +00001348 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001349 self.queue = None
showard77182562009-06-10 00:16:05 +00001350 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001351 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001352 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001353
showard170873e2009-01-07 00:22:26 +00001354 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1355 for task in tasks)
1356 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1357
showardd3dc1992009-04-22 21:01:40 +00001358 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001359 for task in tasks:
1360 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001361
1362
showardd3dc1992009-04-22 21:01:40 +00001363 def _clear_queue(self):
1364 self.queue = Queue.Queue(0)
1365
1366
showard170873e2009-01-07 00:22:26 +00001367 def _union_ids(self, id_lists):
1368 return set(itertools.chain(*id_lists))
1369
1370
jadmanski0afbb632008-06-06 21:10:57 +00001371 def add_task(self, task):
1372 self.queue.put_nowait(task)
1373 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001374
1375
jadmanski0afbb632008-06-06 21:10:57 +00001376 def tick(self):
showard21baa452008-10-21 00:08:39 +00001377 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001378 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001379 self.active_task.poll()
1380 if not self.active_task.is_done():
1381 return
1382 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001383
1384
jadmanski0afbb632008-06-06 21:10:57 +00001385 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001386 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001387 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001388 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001389 if not self.active_task.success:
1390 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001391 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001392
jadmanski0afbb632008-06-06 21:10:57 +00001393 if not self.is_done():
1394 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001395
1396
jadmanski0afbb632008-06-06 21:10:57 +00001397 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001398 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001399 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1400 # get reset.
1401 new_agent = Agent(self.active_task.failure_tasks)
1402 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001403
mblighe2586682008-02-29 22:45:46 +00001404
showard4c5374f2008-09-04 17:02:56 +00001405 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001406 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001407
1408
jadmanski0afbb632008-06-06 21:10:57 +00001409 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001410 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001411
1412
showardd3dc1992009-04-22 21:01:40 +00001413 def abort(self):
showard08a36412009-05-05 01:01:13 +00001414 # abort tasks until the queue is empty or a task ignores the abort
1415 while not self.is_done():
1416 if not self.active_task:
1417 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001418 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001419 if not self.active_task.aborted:
1420 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001421 return
1422 self.active_task = None
1423
showardd3dc1992009-04-22 21:01:40 +00001424
showard77182562009-06-10 00:16:05 +00001425class DelayedCallTask(object):
1426 """
1427 A task object like AgentTask for an Agent to run that waits for the
1428 specified amount of time to have elapsed before calling the supplied
1429 callback once and finishing. If the callback returns anything, it is
1430 assumed to be a new Agent instance and will be added to the dispatcher.
1431
1432 @attribute end_time: The absolute posix time after which this task will
1433 call its callback when it is polled and be finished.
1434
1435 Also has all attributes required by the Agent class.
1436 """
1437 def __init__(self, delay_seconds, callback, now_func=None):
1438 """
1439 @param delay_seconds: The delay in seconds from now that this task
1440 will call the supplied callback and be done.
1441 @param callback: A callable to be called by this task once after at
1442 least delay_seconds time has elapsed. It must return None
1443 or a new Agent instance.
1444 @param now_func: A time.time like function. Default: time.time.
1445 Used for testing.
1446 """
1447 assert delay_seconds > 0
1448 assert callable(callback)
1449 if not now_func:
1450 now_func = time.time
1451 self._now_func = now_func
1452 self._callback = callback
1453
1454 self.end_time = self._now_func() + delay_seconds
1455
1456 # These attributes are required by Agent.
1457 self.aborted = False
1458 self.failure_tasks = ()
1459 self.host_ids = ()
1460 self.success = False
1461 self.queue_entry_ids = ()
1462 # This is filled in by Agent.add_task().
1463 self.agent = None
1464
1465
1466 def poll(self):
1467 if self._callback and self._now_func() >= self.end_time:
1468 new_agent = self._callback()
1469 if new_agent:
1470 self.agent.dispatcher.add_agent(new_agent)
1471 self._callback = None
1472 self.success = True
1473
1474
1475 def is_done(self):
1476 return not self._callback
1477
1478
1479 def abort(self):
1480 self.aborted = True
1481 self._callback = None
1482
1483
mbligh36768f02008-02-22 18:28:33 +00001484class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001485 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1486 pidfile_name=None, paired_with_pidfile=None,
1487 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001488 self.done = False
1489 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001490 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001491 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001492 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001493 self.monitor = recover_run_monitor
1494 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001495 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001496 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001497 self.queue_entry_ids = []
1498 self.host_ids = []
1499 self.log_file = None
1500
1501
1502 def _set_ids(self, host=None, queue_entries=None):
1503 if queue_entries and queue_entries != [None]:
1504 self.host_ids = [entry.host.id for entry in queue_entries]
1505 self.queue_entry_ids = [entry.id for entry in queue_entries]
1506 else:
1507 assert host
1508 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001509
1510
jadmanski0afbb632008-06-06 21:10:57 +00001511 def poll(self):
showard08a36412009-05-05 01:01:13 +00001512 if not self.started:
1513 self.start()
1514 self.tick()
1515
1516
1517 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001518 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001519 exit_code = self.monitor.exit_code()
1520 if exit_code is None:
1521 return
1522 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001523 else:
1524 success = False
mbligh36768f02008-02-22 18:28:33 +00001525
jadmanski0afbb632008-06-06 21:10:57 +00001526 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001527
1528
jadmanski0afbb632008-06-06 21:10:57 +00001529 def is_done(self):
1530 return self.done
mbligh36768f02008-02-22 18:28:33 +00001531
1532
jadmanski0afbb632008-06-06 21:10:57 +00001533 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001534 if self.done:
1535 return
jadmanski0afbb632008-06-06 21:10:57 +00001536 self.done = True
1537 self.success = success
1538 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001539
1540
jadmanski0afbb632008-06-06 21:10:57 +00001541 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001542 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001543
mbligh36768f02008-02-22 18:28:33 +00001544
jadmanski0afbb632008-06-06 21:10:57 +00001545 def cleanup(self):
showardcdaeae82009-08-31 18:32:48 +00001546 if self.monitor and self.log_file:
1547 self.monitor.try_copy_to_results_repository(self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001548
1549
jadmanski0afbb632008-06-06 21:10:57 +00001550 def epilog(self):
1551 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001552
1553
jadmanski0afbb632008-06-06 21:10:57 +00001554 def start(self):
1555 assert self.agent
1556
1557 if not self.started:
1558 self.prolog()
1559 self.run()
1560
1561 self.started = True
1562
1563
1564 def abort(self):
1565 if self.monitor:
1566 self.monitor.kill()
1567 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001568 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001569 self.cleanup()
1570
1571
showarded2afea2009-07-07 20:54:07 +00001572 def _get_consistent_execution_path(self, execution_entries):
1573 first_execution_path = execution_entries[0].execution_path()
1574 for execution_entry in execution_entries[1:]:
1575 assert execution_entry.execution_path() == first_execution_path, (
1576 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1577 execution_entry,
1578 first_execution_path,
1579 execution_entries[0]))
1580 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001581
1582
showarded2afea2009-07-07 20:54:07 +00001583 def _copy_results(self, execution_entries, use_monitor=None):
1584 """
1585 @param execution_entries: list of objects with execution_path() method
1586 """
showard6d1c1432009-08-20 23:30:39 +00001587 if use_monitor is not None and not use_monitor.has_process():
1588 return
1589
showarded2afea2009-07-07 20:54:07 +00001590 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001591 if use_monitor is None:
1592 assert self.monitor
1593 use_monitor = self.monitor
1594 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001595 execution_path = self._get_consistent_execution_path(execution_entries)
1596 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001597 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001598
showarda1e74b32009-05-12 17:32:04 +00001599
1600 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001601 reparse_task = FinalReparseTask(queue_entries)
1602 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1603
1604
showarda1e74b32009-05-12 17:32:04 +00001605 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1606 self._copy_results(queue_entries, use_monitor)
1607 self._parse_results(queue_entries)
1608
1609
showardd3dc1992009-04-22 21:01:40 +00001610 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001611 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001612 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001613 self.monitor = PidfileRunMonitor()
1614 self.monitor.run(self.cmd, self._working_directory,
1615 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001616 log_file=self.log_file,
1617 pidfile_name=pidfile_name,
1618 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001619
1620
showardd9205182009-04-27 20:09:55 +00001621class TaskWithJobKeyvals(object):
1622 """AgentTask mixin providing functionality to help with job keyval files."""
1623 _KEYVAL_FILE = 'keyval'
1624 def _format_keyval(self, key, value):
1625 return '%s=%s' % (key, value)
1626
1627
1628 def _keyval_path(self):
1629 """Subclasses must override this"""
1630 raise NotImplemented
1631
1632
1633 def _write_keyval_after_job(self, field, value):
1634 assert self.monitor
1635 if not self.monitor.has_process():
1636 return
1637 _drone_manager.write_lines_to_file(
1638 self._keyval_path(), [self._format_keyval(field, value)],
1639 paired_with_process=self.monitor.get_process())
1640
1641
1642 def _job_queued_keyval(self, job):
1643 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1644
1645
1646 def _write_job_finished(self):
1647 self._write_keyval_after_job("job_finished", int(time.time()))
1648
1649
showarded2afea2009-07-07 20:54:07 +00001650class SpecialAgentTask(AgentTask):
1651 """
1652 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1653 """
1654
1655 TASK_TYPE = None
1656 host = None
1657 queue_entry = None
1658
1659 def __init__(self, task, extra_command_args, **kwargs):
1660 assert self.host
1661 assert (self.TASK_TYPE is not None,
1662 'self.TASK_TYPE must be overridden')
1663 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001664 if task:
1665 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001666 self._extra_command_args = extra_command_args
1667 super(SpecialAgentTask, self).__init__(**kwargs)
1668
1669
1670 def prolog(self):
1671 super(SpecialAgentTask, self).prolog()
1672 self.task = models.SpecialTask.prepare(self, self.task)
1673 self.cmd = _autoserv_command_line(self.host.hostname,
1674 self._extra_command_args,
1675 queue_entry=self.queue_entry)
1676 self._working_directory = self.task.execution_path()
1677 self.task.activate()
1678
1679
showardb6681aa2009-07-08 21:15:00 +00001680 def cleanup(self):
1681 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001682
1683 # self.task can be None if a SpecialAgentTask is aborted before the
1684 # prolog runs
1685 if self.task:
1686 self.task.finish()
1687
1688 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001689 self._copy_results([self.task])
1690
1691
1692class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1693 TASK_TYPE = models.SpecialTask.Task.REPAIR
1694
1695
1696 def __init__(self, host, queue_entry=None, task=None,
1697 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001698 """\
showard170873e2009-01-07 00:22:26 +00001699 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001700 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001701 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001702 # normalize the protection name
1703 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001704
jadmanski0afbb632008-06-06 21:10:57 +00001705 self.host = host
showard58721a82009-08-20 23:32:40 +00001706 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001707
showarded2afea2009-07-07 20:54:07 +00001708 super(RepairTask, self).__init__(
1709 task, ['-R', '--host-protection', protection],
1710 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001711
showard2fe3f1d2009-07-06 20:19:11 +00001712 # *don't* include the queue entry in IDs -- if the queue entry is
1713 # aborted, we want to leave the repair task running
1714 self._set_ids(host=host)
1715
mbligh36768f02008-02-22 18:28:33 +00001716
jadmanski0afbb632008-06-06 21:10:57 +00001717 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001718 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001719 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001720 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001721
mbligh36768f02008-02-22 18:28:33 +00001722
showardd9205182009-04-27 20:09:55 +00001723 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001724 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001725
1726
showardde634ee2009-01-30 01:44:24 +00001727 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001728 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001729
showard2fe3f1d2009-07-06 20:19:11 +00001730 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001731 return # don't fail metahost entries, they'll be reassigned
1732
showard2fe3f1d2009-07-06 20:19:11 +00001733 self.queue_entry.update_from_database()
1734 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001735 return # entry has been aborted
1736
showard2fe3f1d2009-07-06 20:19:11 +00001737 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001738 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001739 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001740 self._write_keyval_after_job(queued_key, queued_time)
1741 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001742
1743 self.monitor.try_copy_results_on_drone(
1744 source_path=self._working_directory + '/',
1745 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001746
showard2fe3f1d2009-07-06 20:19:11 +00001747 self._copy_results([self.queue_entry])
1748 if self.queue_entry.job.parse_failed_repair:
1749 self._parse_results([self.queue_entry])
1750 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001751
1752
jadmanski0afbb632008-06-06 21:10:57 +00001753 def epilog(self):
1754 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001755
jadmanski0afbb632008-06-06 21:10:57 +00001756 if self.success:
1757 self.host.set_status('Ready')
1758 else:
1759 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001760 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001761 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001762
1763
showarded2afea2009-07-07 20:54:07 +00001764class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001765 def epilog(self):
1766 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001767 should_copy_results = (self.queue_entry and not self.success
1768 and not self.queue_entry.meta_host)
1769 if should_copy_results:
1770 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001771 log_name = os.path.basename(self.task.execution_path())
1772 source = os.path.join(self.task.execution_path(), 'debug',
1773 'autoserv.DEBUG')
1774 destination = os.path.join(self.queue_entry.execution_path(),
1775 log_name)
showardcdaeae82009-08-31 18:32:48 +00001776
1777 self.monitor.try_copy_to_results_repository(
1778 source, destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001779
showard58721a82009-08-20 23:32:40 +00001780 if not self.success and self.queue_entry:
1781 self.queue_entry.requeue()
1782
showard8fe93b52008-11-18 17:53:22 +00001783
1784class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001785 TASK_TYPE = models.SpecialTask.Task.VERIFY
1786
1787
1788 def __init__(self, queue_entry=None, host=None, task=None,
1789 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001790 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001791 self.host = host or queue_entry.host
1792 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001793
showarde788ea62008-11-17 21:02:47 +00001794 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001795 super(VerifyTask, self).__init__(
1796 task, ['-v'], failure_tasks=failure_tasks,
1797 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001798
showard170873e2009-01-07 00:22:26 +00001799 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001800
1801
jadmanski0afbb632008-06-06 21:10:57 +00001802 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001803 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001804
showardb18134f2009-03-20 20:52:18 +00001805 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001806 if self.queue_entry:
1807 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001808 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001809
showarded2afea2009-07-07 20:54:07 +00001810 # Delete any other queued verifies for this host. One verify will do
1811 # and there's no need to keep records of other requests.
1812 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001813 host__id=self.host.id,
1814 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001815 is_active=False, is_complete=False)
1816 queued_verifies = queued_verifies.exclude(id=self.task.id)
1817 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001818
mbligh36768f02008-02-22 18:28:33 +00001819
jadmanski0afbb632008-06-06 21:10:57 +00001820 def epilog(self):
1821 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001822 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001823 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001824
1825
showardb5626452009-06-30 01:57:28 +00001826class CleanupHostsMixin(object):
1827 def _reboot_hosts(self, job, queue_entries, final_success,
1828 num_tests_failed):
1829 reboot_after = job.reboot_after
1830 do_reboot = (
1831 # always reboot after aborted jobs
1832 self._final_status == models.HostQueueEntry.Status.ABORTED
1833 or reboot_after == models.RebootAfter.ALWAYS
1834 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1835 and final_success and num_tests_failed == 0))
1836
1837 for queue_entry in queue_entries:
1838 if do_reboot:
1839 # don't pass the queue entry to the CleanupTask. if the cleanup
1840 # fails, the job doesn't care -- it's over.
1841 cleanup_task = CleanupTask(host=queue_entry.host)
1842 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1843 else:
1844 queue_entry.host.set_status('Ready')
1845
1846
1847class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001848 def __init__(self, job, queue_entries, cmd=None, group_name='',
1849 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001850 self.job = job
1851 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001852 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001853 super(QueueTask, self).__init__(
1854 cmd, self._execution_path(),
1855 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001856 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001857
1858
showard73ec0442009-02-07 02:05:20 +00001859 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001860 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001861
1862
1863 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1864 keyval_contents = '\n'.join(self._format_keyval(key, value)
1865 for key, value in keyval_dict.iteritems())
1866 # always end with a newline to allow additional keyvals to be written
1867 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001868 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001869 keyval_contents,
1870 file_path=keyval_path)
1871
1872
1873 def _write_keyvals_before_job(self, keyval_dict):
1874 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1875
1876
showard170873e2009-01-07 00:22:26 +00001877 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001878 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001879 host.hostname)
1880 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001881 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1882 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001883
1884
showarded2afea2009-07-07 20:54:07 +00001885 def _execution_path(self):
1886 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001887
1888
jadmanski0afbb632008-06-06 21:10:57 +00001889 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001890 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001891 keyval_dict = {queued_key: queued_time}
1892 if self.group_name:
1893 keyval_dict['host_group_name'] = self.group_name
1894 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001895 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001896 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001897 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001898 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001899 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001900 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001901 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1902 # TODO(gps): Remove this if nothing needs it anymore.
1903 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001904 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001905
1906
showard35162b02009-03-03 02:17:30 +00001907 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001908 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001909 _drone_manager.write_lines_to_file(error_file_path,
1910 [_LOST_PROCESS_ERROR])
1911
1912
showardd3dc1992009-04-22 21:01:40 +00001913 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001914 if not self.monitor:
1915 return
1916
showardd9205182009-04-27 20:09:55 +00001917 self._write_job_finished()
1918
showard35162b02009-03-03 02:17:30 +00001919 if self.monitor.lost_process:
1920 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00001921
1922 gather_task = GatherLogsTask(self.job, self.queue_entries)
1923 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
jadmanskif7fa2cc2008-10-01 14:13:23 +00001924
1925
showardcbd74612008-11-19 21:42:02 +00001926 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001927 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001928 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001929 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001930 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001931
1932
jadmanskif7fa2cc2008-10-01 14:13:23 +00001933 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001934 if not self.monitor or not self.monitor.has_process():
1935 return
1936
jadmanskif7fa2cc2008-10-01 14:13:23 +00001937 # build up sets of all the aborted_by and aborted_on values
1938 aborted_by, aborted_on = set(), set()
1939 for queue_entry in self.queue_entries:
1940 if queue_entry.aborted_by:
1941 aborted_by.add(queue_entry.aborted_by)
1942 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1943 aborted_on.add(t)
1944
1945 # extract some actual, unique aborted by value and write it out
1946 assert len(aborted_by) <= 1
1947 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001948 aborted_by_value = aborted_by.pop()
1949 aborted_on_value = max(aborted_on)
1950 else:
1951 aborted_by_value = 'autotest_system'
1952 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001953
showarda0382352009-02-11 23:36:43 +00001954 self._write_keyval_after_job("aborted_by", aborted_by_value)
1955 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001956
showardcbd74612008-11-19 21:42:02 +00001957 aborted_on_string = str(datetime.datetime.fromtimestamp(
1958 aborted_on_value))
1959 self._write_status_comment('Job aborted by %s on %s' %
1960 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001961
1962
jadmanski0afbb632008-06-06 21:10:57 +00001963 def abort(self):
1964 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001965 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001966 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001967
1968
jadmanski0afbb632008-06-06 21:10:57 +00001969 def epilog(self):
1970 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001971 self._finish_task()
1972 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001973
1974
showardd3dc1992009-04-22 21:01:40 +00001975class PostJobTask(AgentTask):
1976 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001977 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001978 self._queue_entries = queue_entries
1979 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001980
showarded2afea2009-07-07 20:54:07 +00001981 self._execution_path = self._get_consistent_execution_path(
1982 queue_entries)
1983 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001984 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001985 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001986 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1987
1988 if _testing_mode:
1989 command = 'true'
1990 else:
1991 command = self._generate_command(self._results_dir)
1992
showarded2afea2009-07-07 20:54:07 +00001993 super(PostJobTask, self).__init__(
1994 cmd=command, working_directory=self._execution_path,
1995 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001996
showarded2afea2009-07-07 20:54:07 +00001997 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001998 self._final_status = self._determine_final_status()
1999
2000
2001 def _generate_command(self, results_dir):
2002 raise NotImplementedError('Subclasses must override this')
2003
2004
2005 def _job_was_aborted(self):
2006 was_aborted = None
2007 for queue_entry in self._queue_entries:
2008 queue_entry.update_from_database()
2009 if was_aborted is None: # first queue entry
2010 was_aborted = bool(queue_entry.aborted)
2011 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2012 email_manager.manager.enqueue_notify_email(
2013 'Inconsistent abort state',
2014 'Queue entries have inconsistent abort state: ' +
2015 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2016 # don't crash here, just assume true
2017 return True
2018 return was_aborted
2019
2020
2021 def _determine_final_status(self):
2022 if self._job_was_aborted():
2023 return models.HostQueueEntry.Status.ABORTED
2024
2025 # we'll use a PidfileRunMonitor to read the autoserv exit status
2026 if self._autoserv_monitor.exit_code() == 0:
2027 return models.HostQueueEntry.Status.COMPLETED
2028 return models.HostQueueEntry.Status.FAILED
2029
2030
2031 def run(self):
showard5add1c82009-05-26 19:27:46 +00002032 # make sure we actually have results to work with.
2033 # this should never happen in normal operation.
2034 if not self._autoserv_monitor.has_process():
2035 email_manager.manager.enqueue_notify_email(
2036 'No results in post-job task',
2037 'No results in post-job task at %s' %
2038 self._autoserv_monitor.pidfile_id)
2039 self.finished(False)
2040 return
2041
2042 super(PostJobTask, self).run(
2043 pidfile_name=self._pidfile_name,
2044 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002045
2046
2047 def _set_all_statuses(self, status):
2048 for queue_entry in self._queue_entries:
2049 queue_entry.set_status(status)
2050
2051
2052 def abort(self):
2053 # override AgentTask.abort() to avoid killing the process and ending
2054 # the task. post-job tasks continue when the job is aborted.
2055 pass
2056
2057
showardb5626452009-06-30 01:57:28 +00002058class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002059 """
2060 Task responsible for
2061 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2062 * copying logs to the results repository
2063 * spawning CleanupTasks for hosts, if necessary
2064 * spawning a FinalReparseTask for the job
2065 """
showarded2afea2009-07-07 20:54:07 +00002066 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002067 self._job = job
2068 super(GatherLogsTask, self).__init__(
2069 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002070 logfile_name='.collect_crashinfo.log',
2071 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002072 self._set_ids(queue_entries=queue_entries)
2073
2074
2075 def _generate_command(self, results_dir):
2076 host_list = ','.join(queue_entry.host.hostname
2077 for queue_entry in self._queue_entries)
2078 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2079 '-r', results_dir]
2080
2081
2082 def prolog(self):
2083 super(GatherLogsTask, self).prolog()
2084 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2085
2086
showardd3dc1992009-04-22 21:01:40 +00002087 def epilog(self):
2088 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002089
showard6d1c1432009-08-20 23:30:39 +00002090 self._copy_and_parse_results(self._queue_entries,
2091 use_monitor=self._autoserv_monitor)
2092
2093 if self._autoserv_monitor.has_process():
2094 final_success = (self._final_status ==
2095 models.HostQueueEntry.Status.COMPLETED)
2096 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2097 else:
2098 final_success = False
2099 num_tests_failed = 0
2100
showardb5626452009-06-30 01:57:28 +00002101 self._reboot_hosts(self._job, self._queue_entries, final_success,
2102 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002103
2104
showard0bbfc212009-04-29 21:06:13 +00002105 def run(self):
showard597bfd32009-05-08 18:22:50 +00002106 autoserv_exit_code = self._autoserv_monitor.exit_code()
2107 # only run if Autoserv exited due to some signal. if we have no exit
2108 # code, assume something bad (and signal-like) happened.
2109 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002110 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002111 else:
2112 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002113
2114
showard8fe93b52008-11-18 17:53:22 +00002115class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002116 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2117
2118
2119 def __init__(self, host=None, queue_entry=None, task=None,
2120 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002121 assert bool(host) ^ bool(queue_entry)
2122 if queue_entry:
2123 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002124 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002125 self.host = host
showard170873e2009-01-07 00:22:26 +00002126
showarde788ea62008-11-17 21:02:47 +00002127 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002128 super(CleanupTask, self).__init__(
2129 task, ['--cleanup'], failure_tasks=[repair_task],
2130 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002131
2132 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002133
mblighd5c95802008-03-05 00:33:46 +00002134
jadmanski0afbb632008-06-06 21:10:57 +00002135 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002136 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002137 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002138 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002139
mblighd5c95802008-03-05 00:33:46 +00002140
showard21baa452008-10-21 00:08:39 +00002141 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002142 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002143
showard21baa452008-10-21 00:08:39 +00002144 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002145 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002146 self.host.update_field('dirty', 0)
2147
2148
showardd3dc1992009-04-22 21:01:40 +00002149class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002150 _num_running_parses = 0
2151
showarded2afea2009-07-07 20:54:07 +00002152 def __init__(self, queue_entries, recover_run_monitor=None):
2153 super(FinalReparseTask, self).__init__(
2154 queue_entries, pidfile_name=_PARSER_PID_FILE,
2155 logfile_name='.parse.log',
2156 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002157 # don't use _set_ids, since we don't want to set the host_ids
2158 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002159 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002160
showard97aed502008-11-04 02:01:24 +00002161
2162 @classmethod
2163 def _increment_running_parses(cls):
2164 cls._num_running_parses += 1
2165
2166
2167 @classmethod
2168 def _decrement_running_parses(cls):
2169 cls._num_running_parses -= 1
2170
2171
2172 @classmethod
2173 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002174 return (cls._num_running_parses <
2175 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002176
2177
2178 def prolog(self):
2179 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002180 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002181
2182
2183 def epilog(self):
2184 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002185 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002186
2187
showardd3dc1992009-04-22 21:01:40 +00002188 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002189 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002190 results_dir]
showard97aed502008-11-04 02:01:24 +00002191
2192
showard08a36412009-05-05 01:01:13 +00002193 def tick(self):
2194 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002195 # and we can, at which point we revert to default behavior
2196 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002197 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002198 else:
2199 self._try_starting_parse()
2200
2201
2202 def run(self):
2203 # override run() to not actually run unless we can
2204 self._try_starting_parse()
2205
2206
2207 def _try_starting_parse(self):
2208 if not self._can_run_new_parse():
2209 return
showard170873e2009-01-07 00:22:26 +00002210
showard97aed502008-11-04 02:01:24 +00002211 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002212 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002213
showard97aed502008-11-04 02:01:24 +00002214 self._increment_running_parses()
2215 self._parse_started = True
2216
2217
2218 def finished(self, success):
2219 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002220 if self._parse_started:
2221 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002222
2223
showardc9ae1782009-01-30 01:42:37 +00002224class SetEntryPendingTask(AgentTask):
2225 def __init__(self, queue_entry):
2226 super(SetEntryPendingTask, self).__init__(cmd='')
2227 self._queue_entry = queue_entry
2228 self._set_ids(queue_entries=[queue_entry])
2229
2230
2231 def run(self):
2232 agent = self._queue_entry.on_pending()
2233 if agent:
2234 self.agent.dispatcher.add_agent(agent)
2235 self.finished(True)
2236
2237
showarda3c58572009-03-12 20:36:59 +00002238class DBError(Exception):
2239 """Raised by the DBObject constructor when its select fails."""
2240
2241
mbligh36768f02008-02-22 18:28:33 +00002242class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002243 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002244
2245 # Subclasses MUST override these:
2246 _table_name = ''
2247 _fields = ()
2248
showarda3c58572009-03-12 20:36:59 +00002249 # A mapping from (type, id) to the instance of the object for that
2250 # particular id. This prevents us from creating new Job() and Host()
2251 # instances for every HostQueueEntry object that we instantiate as
2252 # multiple HQEs often share the same Job.
2253 _instances_by_type_and_id = weakref.WeakValueDictionary()
2254 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002255
showarda3c58572009-03-12 20:36:59 +00002256
2257 def __new__(cls, id=None, **kwargs):
2258 """
2259 Look to see if we already have an instance for this particular type
2260 and id. If so, use it instead of creating a duplicate instance.
2261 """
2262 if id is not None:
2263 instance = cls._instances_by_type_and_id.get((cls, id))
2264 if instance:
2265 return instance
2266 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2267
2268
2269 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002270 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002271 assert self._table_name, '_table_name must be defined in your class'
2272 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002273 if not new_record:
2274 if self._initialized and not always_query:
2275 return # We've already been initialized.
2276 if id is None:
2277 id = row[0]
2278 # Tell future constructors to use us instead of re-querying while
2279 # this instance is still around.
2280 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002281
showard6ae5ea92009-02-25 00:11:51 +00002282 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002283
jadmanski0afbb632008-06-06 21:10:57 +00002284 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002285
jadmanski0afbb632008-06-06 21:10:57 +00002286 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002287 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002288
showarda3c58572009-03-12 20:36:59 +00002289 if self._initialized:
2290 differences = self._compare_fields_in_row(row)
2291 if differences:
showard7629f142009-03-27 21:02:02 +00002292 logging.warn(
2293 'initialized %s %s instance requery is updating: %s',
2294 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002295 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002296 self._initialized = True
2297
2298
2299 @classmethod
2300 def _clear_instance_cache(cls):
2301 """Used for testing, clear the internal instance cache."""
2302 cls._instances_by_type_and_id.clear()
2303
2304
showardccbd6c52009-03-21 00:10:21 +00002305 def _fetch_row_from_db(self, row_id):
2306 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2307 rows = _db.execute(sql, (row_id,))
2308 if not rows:
showard76e29d12009-04-15 21:53:10 +00002309 raise DBError("row not found (table=%s, row id=%s)"
2310 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002311 return rows[0]
2312
2313
showarda3c58572009-03-12 20:36:59 +00002314 def _assert_row_length(self, row):
2315 assert len(row) == len(self._fields), (
2316 "table = %s, row = %s/%d, fields = %s/%d" % (
2317 self.__table, row, len(row), self._fields, len(self._fields)))
2318
2319
2320 def _compare_fields_in_row(self, row):
2321 """
2322 Given a row as returned by a SELECT query, compare it to our existing
2323 in memory fields.
2324
2325 @param row - A sequence of values corresponding to fields named in
2326 The class attribute _fields.
2327
2328 @returns A dictionary listing the differences keyed by field name
2329 containing tuples of (current_value, row_value).
2330 """
2331 self._assert_row_length(row)
2332 differences = {}
2333 for field, row_value in itertools.izip(self._fields, row):
2334 current_value = getattr(self, field)
2335 if current_value != row_value:
2336 differences[field] = (current_value, row_value)
2337 return differences
showard2bab8f42008-11-12 18:15:22 +00002338
2339
2340 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002341 """
2342 Update our field attributes using a single row returned by SELECT.
2343
2344 @param row - A sequence of values corresponding to fields named in
2345 the class fields list.
2346 """
2347 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002348
showard2bab8f42008-11-12 18:15:22 +00002349 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002350 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002351 setattr(self, field, value)
2352 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002353
showard2bab8f42008-11-12 18:15:22 +00002354 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002355
mblighe2586682008-02-29 22:45:46 +00002356
showardccbd6c52009-03-21 00:10:21 +00002357 def update_from_database(self):
2358 assert self.id is not None
2359 row = self._fetch_row_from_db(self.id)
2360 self._update_fields_from_row(row)
2361
2362
jadmanski0afbb632008-06-06 21:10:57 +00002363 def count(self, where, table = None):
2364 if not table:
2365 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002366
jadmanski0afbb632008-06-06 21:10:57 +00002367 rows = _db.execute("""
2368 SELECT count(*) FROM %s
2369 WHERE %s
2370 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002371
jadmanski0afbb632008-06-06 21:10:57 +00002372 assert len(rows) == 1
2373
2374 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002375
2376
showardd3dc1992009-04-22 21:01:40 +00002377 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002378 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002379
showard2bab8f42008-11-12 18:15:22 +00002380 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002381 return
mbligh36768f02008-02-22 18:28:33 +00002382
mblighf8c624d2008-07-03 16:58:45 +00002383 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002384 _db.execute(query, (value, self.id))
2385
showard2bab8f42008-11-12 18:15:22 +00002386 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002387
2388
jadmanski0afbb632008-06-06 21:10:57 +00002389 def save(self):
2390 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002391 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002392 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002393 values = []
2394 for key in keys:
2395 value = getattr(self, key)
2396 if value is None:
2397 values.append('NULL')
2398 else:
2399 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002400 values_str = ','.join(values)
2401 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2402 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002403 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002404 # Update our id to the one the database just assigned to us.
2405 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002406
2407
jadmanski0afbb632008-06-06 21:10:57 +00002408 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002409 self._instances_by_type_and_id.pop((type(self), id), None)
2410 self._initialized = False
2411 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002412 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2413 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002414
2415
showard63a34772008-08-18 19:32:50 +00002416 @staticmethod
2417 def _prefix_with(string, prefix):
2418 if string:
2419 string = prefix + string
2420 return string
2421
2422
jadmanski0afbb632008-06-06 21:10:57 +00002423 @classmethod
showard989f25d2008-10-01 11:38:11 +00002424 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002425 """
2426 Construct instances of our class based on the given database query.
2427
2428 @yields One class instance for each row fetched.
2429 """
showard63a34772008-08-18 19:32:50 +00002430 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2431 where = cls._prefix_with(where, 'WHERE ')
2432 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002433 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002434 'joins' : joins,
2435 'where' : where,
2436 'order_by' : order_by})
2437 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002438 for row in rows:
2439 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002440
mbligh36768f02008-02-22 18:28:33 +00002441
2442class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002443 _table_name = 'ineligible_host_queues'
2444 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002445
2446
showard89f84db2009-03-12 20:39:13 +00002447class AtomicGroup(DBObject):
2448 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002449 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2450 'invalid')
showard89f84db2009-03-12 20:39:13 +00002451
2452
showard989f25d2008-10-01 11:38:11 +00002453class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002454 _table_name = 'labels'
2455 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002456 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002457
2458
showard6157c632009-07-06 20:19:31 +00002459 def __repr__(self):
2460 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2461 self.name, self.id, self.atomic_group_id)
2462
2463
mbligh36768f02008-02-22 18:28:33 +00002464class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002465 _table_name = 'hosts'
2466 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2467 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2468
2469
jadmanski0afbb632008-06-06 21:10:57 +00002470 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002471 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002472 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002473
2474
showard170873e2009-01-07 00:22:26 +00002475 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002476 """
showard170873e2009-01-07 00:22:26 +00002477 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002478 """
2479 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002480 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002481 FROM labels
2482 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002483 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002484 ORDER BY labels.name
2485 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002486 platform = None
2487 all_labels = []
2488 for label_name, is_platform in rows:
2489 if is_platform:
2490 platform = label_name
2491 all_labels.append(label_name)
2492 return platform, all_labels
2493
2494
showard2fe3f1d2009-07-06 20:19:11 +00002495 def reverify_tasks(self):
2496 cleanup_task = CleanupTask(host=self)
2497 verify_task = VerifyTask(host=self)
2498
showard6d7b2ff2009-06-10 00:16:47 +00002499 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002500 self.set_status('Cleaning')
2501 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002502
2503
showard54c1ea92009-05-20 00:32:58 +00002504 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2505
2506
2507 @classmethod
2508 def cmp_for_sort(cls, a, b):
2509 """
2510 A comparison function for sorting Host objects by hostname.
2511
2512 This strips any trailing numeric digits, ignores leading 0s and
2513 compares hostnames by the leading name and the trailing digits as a
2514 number. If both hostnames do not match this pattern, they are simply
2515 compared as lower case strings.
2516
2517 Example of how hostnames will be sorted:
2518
2519 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2520
2521 This hopefully satisfy most people's hostname sorting needs regardless
2522 of their exact naming schemes. Nobody sane should have both a host10
2523 and host010 (but the algorithm works regardless).
2524 """
2525 lower_a = a.hostname.lower()
2526 lower_b = b.hostname.lower()
2527 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2528 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2529 if match_a and match_b:
2530 name_a, number_a_str = match_a.groups()
2531 name_b, number_b_str = match_b.groups()
2532 number_a = int(number_a_str.lstrip('0'))
2533 number_b = int(number_b_str.lstrip('0'))
2534 result = cmp((name_a, number_a), (name_b, number_b))
2535 if result == 0 and lower_a != lower_b:
2536 # If they compared equal above but the lower case names are
2537 # indeed different, don't report equality. abc012 != abc12.
2538 return cmp(lower_a, lower_b)
2539 return result
2540 else:
2541 return cmp(lower_a, lower_b)
2542
2543
mbligh36768f02008-02-22 18:28:33 +00002544class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002545 _table_name = 'host_queue_entries'
2546 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002547 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002548 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002549
2550
showarda3c58572009-03-12 20:36:59 +00002551 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002552 assert id or row
showarda3c58572009-03-12 20:36:59 +00002553 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002554 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002555
jadmanski0afbb632008-06-06 21:10:57 +00002556 if self.host_id:
2557 self.host = Host(self.host_id)
2558 else:
2559 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002560
showard77182562009-06-10 00:16:05 +00002561 if self.atomic_group_id:
2562 self.atomic_group = AtomicGroup(self.atomic_group_id,
2563 always_query=False)
2564 else:
2565 self.atomic_group = None
2566
showard170873e2009-01-07 00:22:26 +00002567 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002568 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002569
2570
showard89f84db2009-03-12 20:39:13 +00002571 @classmethod
2572 def clone(cls, template):
2573 """
2574 Creates a new row using the values from a template instance.
2575
2576 The new instance will not exist in the database or have a valid
2577 id attribute until its save() method is called.
2578 """
2579 assert isinstance(template, cls)
2580 new_row = [getattr(template, field) for field in cls._fields]
2581 clone = cls(row=new_row, new_record=True)
2582 clone.id = None
2583 return clone
2584
2585
showardc85c21b2008-11-24 22:17:37 +00002586 def _view_job_url(self):
2587 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2588
2589
showardf1ae3542009-05-11 19:26:02 +00002590 def get_labels(self):
2591 """
2592 Get all labels associated with this host queue entry (either via the
2593 meta_host or as a job dependency label). The labels yielded are not
2594 guaranteed to be unique.
2595
2596 @yields Label instances associated with this host_queue_entry.
2597 """
2598 if self.meta_host:
2599 yield Label(id=self.meta_host, always_query=False)
2600 labels = Label.fetch(
2601 joins="JOIN jobs_dependency_labels AS deps "
2602 "ON (labels.id = deps.label_id)",
2603 where="deps.job_id = %d" % self.job.id)
2604 for label in labels:
2605 yield label
2606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def set_host(self, host):
2609 if host:
2610 self.queue_log_record('Assigning host ' + host.hostname)
2611 self.update_field('host_id', host.id)
2612 self.update_field('active', True)
2613 self.block_host(host.id)
2614 else:
2615 self.queue_log_record('Releasing host')
2616 self.unblock_host(self.host.id)
2617 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002618
jadmanski0afbb632008-06-06 21:10:57 +00002619 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002620
2621
jadmanski0afbb632008-06-06 21:10:57 +00002622 def get_host(self):
2623 return self.host
mbligh36768f02008-02-22 18:28:33 +00002624
2625
jadmanski0afbb632008-06-06 21:10:57 +00002626 def queue_log_record(self, log_line):
2627 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002628 _drone_manager.write_lines_to_file(self.queue_log_path,
2629 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002630
2631
jadmanski0afbb632008-06-06 21:10:57 +00002632 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002633 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002634 row = [0, self.job.id, host_id]
2635 block = IneligibleHostQueue(row=row, new_record=True)
2636 block.save()
mblighe2586682008-02-29 22:45:46 +00002637
2638
jadmanski0afbb632008-06-06 21:10:57 +00002639 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002640 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002641 blocks = IneligibleHostQueue.fetch(
2642 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2643 for block in blocks:
2644 block.delete()
mblighe2586682008-02-29 22:45:46 +00002645
2646
showard2bab8f42008-11-12 18:15:22 +00002647 def set_execution_subdir(self, subdir=None):
2648 if subdir is None:
2649 assert self.get_host()
2650 subdir = self.get_host().hostname
2651 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002652
2653
showard6355f6b2008-12-05 18:52:13 +00002654 def _get_hostname(self):
2655 if self.host:
2656 return self.host.hostname
2657 return 'no host'
2658
2659
showard170873e2009-01-07 00:22:26 +00002660 def __str__(self):
2661 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2662
2663
jadmanski0afbb632008-06-06 21:10:57 +00002664 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002665 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002666
showardb18134f2009-03-20 20:52:18 +00002667 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002668
showardc85c21b2008-11-24 22:17:37 +00002669 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002670 self.update_field('complete', False)
2671 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002672
jadmanski0afbb632008-06-06 21:10:57 +00002673 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002674 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002675 self.update_field('complete', False)
2676 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002677
showardc85c21b2008-11-24 22:17:37 +00002678 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002679 self.update_field('complete', True)
2680 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002681
2682 should_email_status = (status.lower() in _notify_email_statuses or
2683 'all' in _notify_email_statuses)
2684 if should_email_status:
2685 self._email_on_status(status)
2686
2687 self._email_on_job_complete()
2688
2689
2690 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002691 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002692
2693 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2694 self.job.id, self.job.name, hostname, status)
2695 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2696 self.job.id, self.job.name, hostname, status,
2697 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002698 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002699
2700
2701 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002702 if not self.job.is_finished():
2703 return
showard542e8402008-09-19 20:16:18 +00002704
showardc85c21b2008-11-24 22:17:37 +00002705 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002706 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002707 for queue_entry in hosts_queue:
2708 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002709 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002710 queue_entry.status))
2711
2712 summary_text = "\n".join(summary_text)
2713 status_counts = models.Job.objects.get_status_counts(
2714 [self.job.id])[self.job.id]
2715 status = ', '.join('%d %s' % (count, status) for status, count
2716 in status_counts.iteritems())
2717
2718 subject = 'Autotest: Job ID: %s "%s" %s' % (
2719 self.job.id, self.job.name, status)
2720 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2721 self.job.id, self.job.name, status, self._view_job_url(),
2722 summary_text)
showard170873e2009-01-07 00:22:26 +00002723 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002724
2725
showard77182562009-06-10 00:16:05 +00002726 def run_pre_job_tasks(self, assigned_host=None):
2727 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002728 assert assigned_host
2729 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002730 if self.host_id is None:
2731 self.set_host(assigned_host)
2732 else:
2733 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002734
showardcfd4a7e2009-07-11 01:47:33 +00002735 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002736 self.job.name, self.meta_host, self.atomic_group_id,
2737 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002738
showard77182562009-06-10 00:16:05 +00002739 return self._do_run_pre_job_tasks()
2740
2741
2742 def _do_run_pre_job_tasks(self):
2743 # Every host goes thru the Verifying stage (which may or may not
2744 # actually do anything as determined by get_pre_job_tasks).
2745 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2746
2747 # The pre job tasks always end with a SetEntryPendingTask which
2748 # will continue as appropriate through queue_entry.on_pending().
2749 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002750
showard6ae5ea92009-02-25 00:11:51 +00002751
jadmanski0afbb632008-06-06 21:10:57 +00002752 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002753 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002754 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002755 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002756 # verify/cleanup failure sets the execution subdir, so reset it here
2757 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002758 if self.meta_host:
2759 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002760
2761
jadmanski0afbb632008-06-06 21:10:57 +00002762 def handle_host_failure(self):
2763 """\
2764 Called when this queue entry's host has failed verification and
2765 repair.
2766 """
2767 assert not self.meta_host
2768 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002769 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002770
2771
jadmanskif7fa2cc2008-10-01 14:13:23 +00002772 @property
2773 def aborted_by(self):
2774 self._load_abort_info()
2775 return self._aborted_by
2776
2777
2778 @property
2779 def aborted_on(self):
2780 self._load_abort_info()
2781 return self._aborted_on
2782
2783
2784 def _load_abort_info(self):
2785 """ Fetch info about who aborted the job. """
2786 if hasattr(self, "_aborted_by"):
2787 return
2788 rows = _db.execute("""
2789 SELECT users.login, aborted_host_queue_entries.aborted_on
2790 FROM aborted_host_queue_entries
2791 INNER JOIN users
2792 ON users.id = aborted_host_queue_entries.aborted_by_id
2793 WHERE aborted_host_queue_entries.queue_entry_id = %s
2794 """, (self.id,))
2795 if rows:
2796 self._aborted_by, self._aborted_on = rows[0]
2797 else:
2798 self._aborted_by = self._aborted_on = None
2799
2800
showardb2e2c322008-10-14 17:33:55 +00002801 def on_pending(self):
2802 """
2803 Called when an entry in a synchronous job has passed verify. If the
2804 job is ready to run, returns an agent to run the job. Returns None
2805 otherwise.
2806 """
2807 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002808 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002809
2810 # Some debug code here: sends an email if an asynchronous job does not
2811 # immediately enter Starting.
2812 # TODO: Remove this once we figure out why asynchronous jobs are getting
2813 # stuck in Pending.
2814 agent = self.job.run_if_ready(queue_entry=self)
2815 if self.job.synch_count == 1 and agent is None:
2816 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2817 message = 'Asynchronous job stuck in Pending'
2818 email_manager.manager.enqueue_notify_email(subject, message)
2819 return agent
showardb2e2c322008-10-14 17:33:55 +00002820
2821
showardd3dc1992009-04-22 21:01:40 +00002822 def abort(self, dispatcher):
2823 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002824
showardd3dc1992009-04-22 21:01:40 +00002825 Status = models.HostQueueEntry.Status
2826 has_running_job_agent = (
2827 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2828 and dispatcher.get_agents_for_entry(self))
2829 if has_running_job_agent:
2830 # do nothing; post-job tasks will finish and then mark this entry
2831 # with status "Aborted" and take care of the host
2832 return
2833
2834 if self.status in (Status.STARTING, Status.PENDING):
2835 self.host.set_status(models.Host.Status.READY)
2836 elif self.status == Status.VERIFYING:
2837 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2838
2839 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002840
2841 def execution_tag(self):
2842 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002843 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002844
2845
showarded2afea2009-07-07 20:54:07 +00002846 def execution_path(self):
2847 return self.execution_tag()
2848
2849
mbligh36768f02008-02-22 18:28:33 +00002850class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002851 _table_name = 'jobs'
2852 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2853 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002854 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002855 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002856
showard77182562009-06-10 00:16:05 +00002857 # This does not need to be a column in the DB. The delays are likely to
2858 # be configured short. If the scheduler is stopped and restarted in
2859 # the middle of a job's delay cycle, the delay cycle will either be
2860 # repeated or skipped depending on the number of Pending machines found
2861 # when the restarted scheduler recovers to track it. Not a problem.
2862 #
2863 # A reference to the DelayedCallTask that will wake up the job should
2864 # no other HQEs change state in time. Its end_time attribute is used
2865 # by our run_with_ready_delay() method to determine if the wait is over.
2866 _delay_ready_task = None
2867
2868 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2869 # all status='Pending' atomic group HQEs incase a delay was running when the
2870 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002871
showarda3c58572009-03-12 20:36:59 +00002872 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002873 assert id or row
showarda3c58572009-03-12 20:36:59 +00002874 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002875
mblighe2586682008-02-29 22:45:46 +00002876
jadmanski0afbb632008-06-06 21:10:57 +00002877 def is_server_job(self):
2878 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002879
2880
showard170873e2009-01-07 00:22:26 +00002881 def tag(self):
2882 return "%s-%s" % (self.id, self.owner)
2883
2884
jadmanski0afbb632008-06-06 21:10:57 +00002885 def get_host_queue_entries(self):
2886 rows = _db.execute("""
2887 SELECT * FROM host_queue_entries
2888 WHERE job_id= %s
2889 """, (self.id,))
2890 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002891
jadmanski0afbb632008-06-06 21:10:57 +00002892 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002893
jadmanski0afbb632008-06-06 21:10:57 +00002894 return entries
mbligh36768f02008-02-22 18:28:33 +00002895
2896
jadmanski0afbb632008-06-06 21:10:57 +00002897 def set_status(self, status, update_queues=False):
2898 self.update_field('status',status)
2899
2900 if update_queues:
2901 for queue_entry in self.get_host_queue_entries():
2902 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002903
2904
showard77182562009-06-10 00:16:05 +00002905 def _atomic_and_has_started(self):
2906 """
2907 @returns True if any of the HostQueueEntries associated with this job
2908 have entered the Status.STARTING state or beyond.
2909 """
2910 atomic_entries = models.HostQueueEntry.objects.filter(
2911 job=self.id, atomic_group__isnull=False)
2912 if atomic_entries.count() <= 0:
2913 return False
2914
showardaf8b4ca2009-06-16 18:47:26 +00002915 # These states may *only* be reached if Job.run() has been called.
2916 started_statuses = (models.HostQueueEntry.Status.STARTING,
2917 models.HostQueueEntry.Status.RUNNING,
2918 models.HostQueueEntry.Status.COMPLETED)
2919
2920 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002921 return started_entries.count() > 0
2922
2923
showard708b3522009-08-20 23:26:15 +00002924 def _hosts_assigned_count(self):
2925 """The number of HostQueueEntries assigned a Host for this job."""
2926 entries = models.HostQueueEntry.objects.filter(job=self.id,
2927 host__isnull=False)
2928 return entries.count()
2929
2930
showard77182562009-06-10 00:16:05 +00002931 def _pending_count(self):
2932 """The number of HostQueueEntries for this job in the Pending state."""
2933 pending_entries = models.HostQueueEntry.objects.filter(
2934 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2935 return pending_entries.count()
2936
2937
jadmanski0afbb632008-06-06 21:10:57 +00002938 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002939 # NOTE: Atomic group jobs stop reporting ready after they have been
2940 # started to avoid launching multiple copies of one atomic job.
2941 # Only possible if synch_count is less than than half the number of
2942 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002943 pending_count = self._pending_count()
2944 atomic_and_has_started = self._atomic_and_has_started()
2945 ready = (pending_count >= self.synch_count
2946 and not self._atomic_and_has_started())
2947
2948 if not ready:
2949 logging.info(
2950 'Job %s not ready: %s pending, %s required '
2951 '(Atomic and started: %s)',
2952 self, pending_count, self.synch_count,
2953 atomic_and_has_started)
2954
2955 return ready
mbligh36768f02008-02-22 18:28:33 +00002956
2957
jadmanski0afbb632008-06-06 21:10:57 +00002958 def num_machines(self, clause = None):
2959 sql = "job_id=%s" % self.id
2960 if clause:
2961 sql += " AND (%s)" % clause
2962 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002963
2964
jadmanski0afbb632008-06-06 21:10:57 +00002965 def num_queued(self):
2966 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002967
2968
jadmanski0afbb632008-06-06 21:10:57 +00002969 def num_active(self):
2970 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002971
2972
jadmanski0afbb632008-06-06 21:10:57 +00002973 def num_complete(self):
2974 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002975
2976
jadmanski0afbb632008-06-06 21:10:57 +00002977 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002978 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002979
mbligh36768f02008-02-22 18:28:33 +00002980
showard6bb7c292009-01-30 01:44:51 +00002981 def _not_yet_run_entries(self, include_verifying=True):
2982 statuses = [models.HostQueueEntry.Status.QUEUED,
2983 models.HostQueueEntry.Status.PENDING]
2984 if include_verifying:
2985 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2986 return models.HostQueueEntry.objects.filter(job=self.id,
2987 status__in=statuses)
2988
2989
2990 def _stop_all_entries(self):
2991 entries_to_stop = self._not_yet_run_entries(
2992 include_verifying=False)
2993 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002994 assert not child_entry.complete, (
2995 '%s status=%s, active=%s, complete=%s' %
2996 (child_entry.id, child_entry.status, child_entry.active,
2997 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002998 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2999 child_entry.host.status = models.Host.Status.READY
3000 child_entry.host.save()
3001 child_entry.status = models.HostQueueEntry.Status.STOPPED
3002 child_entry.save()
3003
showard2bab8f42008-11-12 18:15:22 +00003004 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003005 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003006 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003007 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003008
3009
jadmanski0afbb632008-06-06 21:10:57 +00003010 def write_to_machines_file(self, queue_entry):
3011 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00003012 file_path = os.path.join(self.tag(), '.machines')
3013 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003014
3015
showardf1ae3542009-05-11 19:26:02 +00003016 def _next_group_name(self, group_name=''):
3017 """@returns a directory name to use for the next host group results."""
3018 if group_name:
3019 # Sanitize for use as a pathname.
3020 group_name = group_name.replace(os.path.sep, '_')
3021 if group_name.startswith('.'):
3022 group_name = '_' + group_name[1:]
3023 # Add a separator between the group name and 'group%d'.
3024 group_name += '.'
3025 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003026 query = models.HostQueueEntry.objects.filter(
3027 job=self.id).values('execution_subdir').distinct()
3028 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003029 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3030 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003031 if ids:
3032 next_id = max(ids) + 1
3033 else:
3034 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003035 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003036
3037
showard170873e2009-01-07 00:22:26 +00003038 def _write_control_file(self, execution_tag):
3039 control_path = _drone_manager.attach_file_to_execution(
3040 execution_tag, self.control_file)
3041 return control_path
mbligh36768f02008-02-22 18:28:33 +00003042
showardb2e2c322008-10-14 17:33:55 +00003043
showard2bab8f42008-11-12 18:15:22 +00003044 def get_group_entries(self, queue_entry_from_group):
3045 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003046 return list(HostQueueEntry.fetch(
3047 where='job_id=%s AND execution_subdir=%s',
3048 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003049
3050
showardb2e2c322008-10-14 17:33:55 +00003051 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003052 assert queue_entries
3053 execution_tag = queue_entries[0].execution_tag()
3054 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003055 hostnames = ','.join([entry.get_host().hostname
3056 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003057
showard87ba02a2009-04-20 19:37:32 +00003058 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003059 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003060 ['-P', execution_tag, '-n',
3061 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003062 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003063
jadmanski0afbb632008-06-06 21:10:57 +00003064 if not self.is_server_job():
3065 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003066
showardb2e2c322008-10-14 17:33:55 +00003067 return params
mblighe2586682008-02-29 22:45:46 +00003068
mbligh36768f02008-02-22 18:28:33 +00003069
showardc9ae1782009-01-30 01:42:37 +00003070 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003071 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003072 return True
showard0fc38302008-10-23 00:44:07 +00003073 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003074 return queue_entry.get_host().dirty
3075 return False
showard21baa452008-10-21 00:08:39 +00003076
showardc9ae1782009-01-30 01:42:37 +00003077
showard2fe3f1d2009-07-06 20:19:11 +00003078 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003079 do_not_verify = (queue_entry.host.protection ==
3080 host_protections.Protection.DO_NOT_VERIFY)
3081 if do_not_verify:
3082 return False
3083 return self.run_verify
3084
3085
showard77182562009-06-10 00:16:05 +00003086 def get_pre_job_tasks(self, queue_entry):
3087 """
3088 Get a list of tasks to perform before the host_queue_entry
3089 may be used to run this Job (such as Cleanup & Verify).
3090
3091 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003092 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003093 task in the list calls HostQueueEntry.on_pending(), which
3094 continues the flow of the job.
3095 """
showard21baa452008-10-21 00:08:39 +00003096 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003097 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003098 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003099 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003100 tasks.append(VerifyTask(queue_entry=queue_entry))
3101 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003102 return tasks
3103
3104
showardf1ae3542009-05-11 19:26:02 +00003105 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003106 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003107 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003108 else:
showardf1ae3542009-05-11 19:26:02 +00003109 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003110 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003111 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003112 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003113
3114 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003115 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003116
3117
3118 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003119 """
3120 @returns A tuple containing a list of HostQueueEntry instances to be
3121 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003122 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003123 """
showard77182562009-06-10 00:16:05 +00003124 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003125 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003126 if atomic_group:
3127 num_entries_wanted = atomic_group.max_number_of_machines
3128 else:
3129 num_entries_wanted = self.synch_count
3130 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003131
showardf1ae3542009-05-11 19:26:02 +00003132 if num_entries_wanted > 0:
3133 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003134 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003135 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003136 params=(self.id, include_queue_entry.id)))
3137
3138 # Sort the chosen hosts by hostname before slicing.
3139 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3140 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3141 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3142 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003143
showardf1ae3542009-05-11 19:26:02 +00003144 # Sanity check. We'll only ever be called if this can be met.
3145 assert len(chosen_entries) >= self.synch_count
3146
3147 if atomic_group:
3148 # Look at any meta_host and dependency labels and pick the first
3149 # one that also specifies this atomic group. Use that label name
3150 # as the group name if possible (it is more specific).
3151 group_name = atomic_group.name
3152 for label in include_queue_entry.get_labels():
3153 if label.atomic_group_id:
3154 assert label.atomic_group_id == atomic_group.id
3155 group_name = label.name
3156 break
3157 else:
3158 group_name = ''
3159
3160 self._assign_new_group(chosen_entries, group_name=group_name)
3161 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003162
3163
showard77182562009-06-10 00:16:05 +00003164 def run_if_ready(self, queue_entry):
3165 """
3166 @returns An Agent instance to ultimately run this job if enough hosts
3167 are ready for it to run.
3168 @returns None and potentially cleans up excess hosts if this Job
3169 is not ready to run.
3170 """
showardb2e2c322008-10-14 17:33:55 +00003171 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003172 self.stop_if_necessary()
3173 return None
mbligh36768f02008-02-22 18:28:33 +00003174
showard77182562009-06-10 00:16:05 +00003175 if queue_entry.atomic_group:
3176 return self.run_with_ready_delay(queue_entry)
3177
3178 return self.run(queue_entry)
3179
3180
3181 def run_with_ready_delay(self, queue_entry):
3182 """
3183 Start a delay to wait for more hosts to enter Pending state before
3184 launching an atomic group job. Once set, the a delay cannot be reset.
3185
3186 @param queue_entry: The HostQueueEntry object to get atomic group
3187 info from and pass to run_if_ready when the delay is up.
3188
3189 @returns An Agent to run the job as appropriate or None if a delay
3190 has already been set.
3191 """
3192 assert queue_entry.job_id == self.id
3193 assert queue_entry.atomic_group
3194 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003195 pending_threshold = min(self._hosts_assigned_count(),
3196 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003197 over_max_threshold = (self._pending_count() >= pending_threshold)
3198 delay_expired = (self._delay_ready_task and
3199 time.time() >= self._delay_ready_task.end_time)
3200
3201 # Delay is disabled or we already have enough? Do not wait to run.
3202 if not delay or over_max_threshold or delay_expired:
3203 return self.run(queue_entry)
3204
3205 # A delay was previously scheduled.
3206 if self._delay_ready_task:
3207 return None
3208
3209 def run_job_after_delay():
3210 logging.info('Job %s done waiting for extra hosts.', self.id)
3211 return self.run(queue_entry)
3212
showard708b3522009-08-20 23:26:15 +00003213 logging.info('Job %s waiting up to %s seconds for more hosts.',
3214 self.id, delay)
showard77182562009-06-10 00:16:05 +00003215 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3216 callback=run_job_after_delay)
3217
3218 return Agent([self._delay_ready_task], num_processes=0)
3219
3220
3221 def run(self, queue_entry):
3222 """
3223 @param queue_entry: The HostQueueEntry instance calling this method.
3224 @returns An Agent instance to run this job or None if we've already
3225 been run.
3226 """
3227 if queue_entry.atomic_group and self._atomic_and_has_started():
3228 logging.error('Job.run() called on running atomic Job %d '
3229 'with HQE %s.', self.id, queue_entry)
3230 return None
showardf1ae3542009-05-11 19:26:02 +00003231 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3232 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003233
3234
showardf1ae3542009-05-11 19:26:02 +00003235 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003236 for queue_entry in queue_entries:
3237 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003238 params = self._get_autoserv_params(queue_entries)
3239 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003240 cmd=params, group_name=group_name)
3241 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003242 if self._delay_ready_task:
3243 # Cancel any pending callback that would try to run again
3244 # as we are already running.
3245 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003246
showard170873e2009-01-07 00:22:26 +00003247 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003248
3249
showardb000a8d2009-07-28 20:02:07 +00003250 def __str__(self):
3251 return '%s-%s' % (self.id, self.owner)
3252
3253
mbligh36768f02008-02-22 18:28:33 +00003254if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003255 main()