blob: f185ac5af33e73789165f1f8b1f824a6884115e3 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardf13a9e22009-12-18 22:54:09 +000010import itertools, logging, weakref, gc
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
showardf13a9e22009-12-18 22:54:09 +000022from autotest_lib.scheduler import gc_stats
mbligh70feeee2008-06-11 16:20:49 +000023
showard549afad2009-08-20 23:33:36 +000024BABYSITTER_PID_FILE_PREFIX = 'monitor_db_babysitter'
25PID_FILE_PREFIX = 'monitor_db'
mblighb090f142008-02-27 21:33:46 +000026
mbligh36768f02008-02-22 18:28:33 +000027RESULTS_DIR = '.'
28AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000029DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
31
32if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000033 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000034AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
35AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
36
37if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000038 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
showardec6a3b92009-09-25 20:29:13 +000064def _get_pidfile_timeout_secs():
65 """@returns How long to wait for autoserv to write pidfile."""
66 pidfile_timeout_mins = global_config.global_config.get_config_value(
67 scheduler_config.CONFIG_SECTION, 'pidfile_timeout_mins', type=int)
68 return pidfile_timeout_mins * 60
69
70
mbligh83c1e9e2009-05-01 23:10:41 +000071def _site_init_monitor_db_dummy():
72 return {}
73
74
mbligh36768f02008-02-22 18:28:33 +000075def main():
showard27f33872009-04-07 18:20:53 +000076 try:
showard549afad2009-08-20 23:33:36 +000077 try:
78 main_without_exception_handling()
79 except SystemExit:
80 raise
81 except:
82 logging.exception('Exception escaping in monitor_db')
83 raise
84 finally:
85 utils.delete_pid_file_if_exists(PID_FILE_PREFIX)
showard27f33872009-04-07 18:20:53 +000086
87
88def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000089 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000090
showard136e6dc2009-06-10 19:38:49 +000091 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000092 parser = optparse.OptionParser(usage)
93 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
94 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000095 parser.add_option('--test', help='Indicate that scheduler is under ' +
96 'test and should use dummy autoserv and no parsing',
97 action='store_true')
98 (options, args) = parser.parse_args()
99 if len(args) != 1:
100 parser.print_usage()
101 return
mbligh36768f02008-02-22 18:28:33 +0000102
showard5613c662009-06-08 23:30:33 +0000103 scheduler_enabled = global_config.global_config.get_config_value(
104 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
105
106 if not scheduler_enabled:
107 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
108 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +0000109 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000110 sys.exit(1)
111
jadmanski0afbb632008-06-06 21:10:57 +0000112 global RESULTS_DIR
113 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000114
mbligh83c1e9e2009-05-01 23:10:41 +0000115 site_init = utils.import_site_function(__file__,
116 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
117 _site_init_monitor_db_dummy)
118 site_init()
119
showardcca334f2009-03-12 20:38:34 +0000120 # Change the cwd while running to avoid issues incase we were launched from
121 # somewhere odd (such as a random NFS home directory of the person running
122 # sudo to launch us as the appropriate user).
123 os.chdir(RESULTS_DIR)
124
jadmanski0afbb632008-06-06 21:10:57 +0000125 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000126 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
127 "notify_email_statuses",
128 default='')
showardc85c21b2008-11-24 22:17:37 +0000129 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000130 _notify_email_statuses = [status for status in
131 re.split(r'[\s,;:]', notify_statuses_list.lower())
132 if status]
showardc85c21b2008-11-24 22:17:37 +0000133
jadmanski0afbb632008-06-06 21:10:57 +0000134 if options.test:
135 global _autoserv_path
136 _autoserv_path = 'autoserv_dummy'
137 global _testing_mode
138 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000139
mbligh37eceaa2008-12-15 22:56:37 +0000140 # AUTOTEST_WEB.base_url is still a supported config option as some people
141 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000142 global _base_url
showard170873e2009-01-07 00:22:26 +0000143 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
144 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000145 if config_base_url:
146 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000147 else:
mbligh37eceaa2008-12-15 22:56:37 +0000148 # For the common case of everything running on a single server you
149 # can just set the hostname in a single place in the config file.
150 server_name = c.get_config_value('SERVER', 'hostname')
151 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000152 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000153 sys.exit(1)
154 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000155
showardc5afc462009-01-13 00:09:39 +0000156 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000157 server.start()
158
jadmanski0afbb632008-06-06 21:10:57 +0000159 try:
showard136e6dc2009-06-10 19:38:49 +0000160 init()
showardc5afc462009-01-13 00:09:39 +0000161 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000162 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000163
jadmanski0afbb632008-06-06 21:10:57 +0000164 while not _shutdown:
165 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000166 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000167 except:
showard170873e2009-01-07 00:22:26 +0000168 email_manager.manager.log_stacktrace(
169 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000170
showard170873e2009-01-07 00:22:26 +0000171 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000172 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000173 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000174 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000175
176
showard136e6dc2009-06-10 19:38:49 +0000177def setup_logging():
178 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
179 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
180 logging_manager.configure_logging(
181 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
182 logfile_name=log_name)
183
184
mbligh36768f02008-02-22 18:28:33 +0000185def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000186 global _shutdown
187 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000188 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000189
190
showard136e6dc2009-06-10 19:38:49 +0000191def init():
showardb18134f2009-03-20 20:52:18 +0000192 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
193 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000194
showard8de37132009-08-31 18:33:08 +0000195 if utils.program_is_alive(PID_FILE_PREFIX):
showard549afad2009-08-20 23:33:36 +0000196 logging.critical("monitor_db already running, aborting!")
197 sys.exit(1)
198 utils.write_pid(PID_FILE_PREFIX)
mblighfb676032009-04-01 18:25:38 +0000199
showardb1e51872008-10-07 11:08:18 +0000200 if _testing_mode:
201 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000202 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000203
jadmanski0afbb632008-06-06 21:10:57 +0000204 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
205 global _db
showard170873e2009-01-07 00:22:26 +0000206 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
showardb21b8c82009-12-07 19:39:39 +0000207 _db.connect(db_type='django')
mbligh36768f02008-02-22 18:28:33 +0000208
showardfa8629c2008-11-04 16:51:23 +0000209 # ensure Django connection is in autocommit
210 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000211 # bypass the readonly connection
212 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000213
showardb18134f2009-03-20 20:52:18 +0000214 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000215 signal.signal(signal.SIGINT, handle_sigint)
216
showardd1ee1dd2009-01-07 21:33:08 +0000217 drones = global_config.global_config.get_config_value(
218 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
219 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000220 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000221 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000222 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
223
showardb18134f2009-03-20 20:52:18 +0000224 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000225
226
showarded2afea2009-07-07 20:54:07 +0000227def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
228 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000229 """
230 @returns The autoserv command line as a list of executable + parameters.
231
232 @param machines - string - A machine or comma separated list of machines
233 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000234 @param extra_args - list - Additional arguments to pass to autoserv.
235 @param job - Job object - If supplied, -u owner and -l name parameters
236 will be added.
237 @param queue_entry - A HostQueueEntry object - If supplied and no Job
238 object was supplied, this will be used to lookup the Job object.
239 """
showarda9545c02009-12-18 22:44:26 +0000240 autoserv_argv = [_autoserv_path, '-p',
showarded2afea2009-07-07 20:54:07 +0000241 '-r', drone_manager.WORKING_DIRECTORY]
showarda9545c02009-12-18 22:44:26 +0000242 if machines:
243 autoserv_argv += ['-m', machines]
showard87ba02a2009-04-20 19:37:32 +0000244 if job or queue_entry:
245 if not job:
246 job = queue_entry.job
247 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000248 if verbose:
249 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000250 return autoserv_argv + extra_args
251
252
showard89f84db2009-03-12 20:39:13 +0000253class SchedulerError(Exception):
254 """Raised by HostScheduler when an inconsistent state occurs."""
255
256
showard63a34772008-08-18 19:32:50 +0000257class HostScheduler(object):
258 def _get_ready_hosts(self):
259 # avoid any host with a currently active queue entry against it
260 hosts = Host.fetch(
261 joins='LEFT JOIN host_queue_entries AS active_hqe '
262 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000263 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000264 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000265 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000266 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
267 return dict((host.id, host) for host in hosts)
268
269
270 @staticmethod
271 def _get_sql_id_list(id_list):
272 return ','.join(str(item_id) for item_id in id_list)
273
274
275 @classmethod
showard989f25d2008-10-01 11:38:11 +0000276 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000277 if not id_list:
278 return {}
showard63a34772008-08-18 19:32:50 +0000279 query %= cls._get_sql_id_list(id_list)
280 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000281 return cls._process_many2many_dict(rows, flip)
282
283
284 @staticmethod
285 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000286 result = {}
287 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000288 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000289 if flip:
290 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000291 result.setdefault(left_id, set()).add(right_id)
292 return result
293
294
295 @classmethod
296 def _get_job_acl_groups(cls, job_ids):
297 query = """
showardd9ac4452009-02-07 02:04:37 +0000298 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000299 FROM jobs
300 INNER JOIN users ON users.login = jobs.owner
301 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
302 WHERE jobs.id IN (%s)
303 """
304 return cls._get_many2many_dict(query, job_ids)
305
306
307 @classmethod
308 def _get_job_ineligible_hosts(cls, job_ids):
309 query = """
310 SELECT job_id, host_id
311 FROM ineligible_host_queues
312 WHERE job_id IN (%s)
313 """
314 return cls._get_many2many_dict(query, job_ids)
315
316
317 @classmethod
showard989f25d2008-10-01 11:38:11 +0000318 def _get_job_dependencies(cls, job_ids):
319 query = """
320 SELECT job_id, label_id
321 FROM jobs_dependency_labels
322 WHERE job_id IN (%s)
323 """
324 return cls._get_many2many_dict(query, job_ids)
325
326
327 @classmethod
showard63a34772008-08-18 19:32:50 +0000328 def _get_host_acls(cls, host_ids):
329 query = """
showardd9ac4452009-02-07 02:04:37 +0000330 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000331 FROM acl_groups_hosts
332 WHERE host_id IN (%s)
333 """
334 return cls._get_many2many_dict(query, host_ids)
335
336
337 @classmethod
338 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000339 if not host_ids:
340 return {}, {}
showard63a34772008-08-18 19:32:50 +0000341 query = """
342 SELECT label_id, host_id
343 FROM hosts_labels
344 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000345 """ % cls._get_sql_id_list(host_ids)
346 rows = _db.execute(query)
347 labels_to_hosts = cls._process_many2many_dict(rows)
348 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
349 return labels_to_hosts, hosts_to_labels
350
351
352 @classmethod
353 def _get_labels(cls):
354 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000355
356
357 def refresh(self, pending_queue_entries):
358 self._hosts_available = self._get_ready_hosts()
359
360 relevant_jobs = [queue_entry.job_id
361 for queue_entry in pending_queue_entries]
362 self._job_acls = self._get_job_acl_groups(relevant_jobs)
363 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000364 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000365
366 host_ids = self._hosts_available.keys()
367 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000368 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
369
370 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000371
372
373 def _is_acl_accessible(self, host_id, queue_entry):
374 job_acls = self._job_acls.get(queue_entry.job_id, set())
375 host_acls = self._host_acls.get(host_id, set())
376 return len(host_acls.intersection(job_acls)) > 0
377
378
showard989f25d2008-10-01 11:38:11 +0000379 def _check_job_dependencies(self, job_dependencies, host_labels):
380 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000381 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000382
383
384 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
385 queue_entry):
showardade14e22009-01-26 22:38:32 +0000386 if not queue_entry.meta_host:
387 # bypass only_if_needed labels when a specific host is selected
388 return True
389
showard989f25d2008-10-01 11:38:11 +0000390 for label_id in host_labels:
391 label = self._labels[label_id]
392 if not label.only_if_needed:
393 # we don't care about non-only_if_needed labels
394 continue
395 if queue_entry.meta_host == label_id:
396 # if the label was requested in a metahost it's OK
397 continue
398 if label_id not in job_dependencies:
399 return False
400 return True
401
402
showard89f84db2009-03-12 20:39:13 +0000403 def _check_atomic_group_labels(self, host_labels, queue_entry):
404 """
405 Determine if the given HostQueueEntry's atomic group settings are okay
406 to schedule on a host with the given labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000410
411 @returns True if atomic group settings are okay, False otherwise.
412 """
showard6157c632009-07-06 20:19:31 +0000413 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000414 queue_entry.atomic_group_id)
415
416
showard6157c632009-07-06 20:19:31 +0000417 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000418 """
419 Return the atomic group label id for a host with the given set of
420 labels if any, or None otherwise. Raises an exception if more than
421 one atomic group are found in the set of labels.
422
showard6157c632009-07-06 20:19:31 +0000423 @param host_labels: A list of label ids that the host has.
424 @param queue_entry: The HostQueueEntry we're testing. Only used for
425 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000426
427 @returns The id of the atomic group found on a label in host_labels
428 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000429 """
showard6157c632009-07-06 20:19:31 +0000430 atomic_labels = [self._labels[label_id] for label_id in host_labels
431 if self._labels[label_id].atomic_group_id is not None]
432 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000433 if not atomic_ids:
434 return None
435 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000436 logging.error('More than one Atomic Group on HQE "%s" via: %r',
437 queue_entry, atomic_labels)
438 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000439
440
441 def _get_atomic_group_labels(self, atomic_group_id):
442 """
443 Lookup the label ids that an atomic_group is associated with.
444
445 @param atomic_group_id - The id of the AtomicGroup to look up.
446
447 @returns A generator yeilding Label ids for this atomic group.
448 """
449 return (id for id, label in self._labels.iteritems()
450 if label.atomic_group_id == atomic_group_id
451 and not label.invalid)
452
453
showard54c1ea92009-05-20 00:32:58 +0000454 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000455 """
456 @param group_hosts - A sequence of Host ids to test for usability
457 and eligibility against the Job associated with queue_entry.
458 @param queue_entry - The HostQueueEntry that these hosts are being
459 tested for eligibility against.
460
461 @returns A subset of group_hosts Host ids that are eligible for the
462 supplied queue_entry.
463 """
464 return set(host_id for host_id in group_hosts
465 if self._is_host_usable(host_id)
466 and self._is_host_eligible_for_job(host_id, queue_entry))
467
468
showard989f25d2008-10-01 11:38:11 +0000469 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000470 if self._is_host_invalid(host_id):
471 # if an invalid host is scheduled for a job, it's a one-time host
472 # and it therefore bypasses eligibility checks. note this can only
473 # happen for non-metahosts, because invalid hosts have their label
474 # relationships cleared.
475 return True
476
showard989f25d2008-10-01 11:38:11 +0000477 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
478 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000479
showard89f84db2009-03-12 20:39:13 +0000480 return (self._is_acl_accessible(host_id, queue_entry) and
481 self._check_job_dependencies(job_dependencies, host_labels) and
482 self._check_only_if_needed_labels(
483 job_dependencies, host_labels, queue_entry) and
484 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000485
486
showard2924b0a2009-06-18 23:16:15 +0000487 def _is_host_invalid(self, host_id):
488 host_object = self._hosts_available.get(host_id, None)
489 return host_object and host_object.invalid
490
491
showard63a34772008-08-18 19:32:50 +0000492 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000493 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000494 return None
495 return self._hosts_available.pop(queue_entry.host_id, None)
496
497
498 def _is_host_usable(self, host_id):
499 if host_id not in self._hosts_available:
500 # host was already used during this scheduling cycle
501 return False
502 if self._hosts_available[host_id].invalid:
503 # Invalid hosts cannot be used for metahosts. They're included in
504 # the original query because they can be used by non-metahosts.
505 return False
506 return True
507
508
509 def _schedule_metahost(self, queue_entry):
510 label_id = queue_entry.meta_host
511 hosts_in_label = self._label_hosts.get(label_id, set())
512 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
513 set())
514
515 # must iterate over a copy so we can mutate the original while iterating
516 for host_id in list(hosts_in_label):
517 if not self._is_host_usable(host_id):
518 hosts_in_label.remove(host_id)
519 continue
520 if host_id in ineligible_host_ids:
521 continue
showard989f25d2008-10-01 11:38:11 +0000522 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000523 continue
524
showard89f84db2009-03-12 20:39:13 +0000525 # Remove the host from our cached internal state before returning
526 # the host object.
showard63a34772008-08-18 19:32:50 +0000527 hosts_in_label.remove(host_id)
528 return self._hosts_available.pop(host_id)
529 return None
530
531
532 def find_eligible_host(self, queue_entry):
533 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000534 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000535 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000536 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000537 return self._schedule_metahost(queue_entry)
538
539
showard89f84db2009-03-12 20:39:13 +0000540 def find_eligible_atomic_group(self, queue_entry):
541 """
542 Given an atomic group host queue entry, locate an appropriate group
543 of hosts for the associated job to run on.
544
545 The caller is responsible for creating new HQEs for the additional
546 hosts returned in order to run the actual job on them.
547
548 @returns A list of Host instances in a ready state to satisfy this
549 atomic group scheduling. Hosts will all belong to the same
550 atomic group label as specified by the queue_entry.
551 An empty list will be returned if no suitable atomic
552 group could be found.
553
554 TODO(gps): what is responsible for kicking off any attempted repairs on
555 a group of hosts? not this function, but something needs to. We do
556 not communicate that reason for returning [] outside of here...
557 For now, we'll just be unschedulable if enough hosts within one group
558 enter Repair Failed state.
559 """
560 assert queue_entry.atomic_group_id is not None
561 job = queue_entry.job
562 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000563 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000564 if job.synch_count > atomic_group.max_number_of_machines:
565 # Such a Job and HostQueueEntry should never be possible to
566 # create using the frontend. Regardless, we can't process it.
567 # Abort it immediately and log an error on the scheduler.
568 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000569 logging.error(
570 'Error: job %d synch_count=%d > requested atomic_group %d '
571 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
572 job.id, job.synch_count, atomic_group.id,
573 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000574 return []
575 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
576 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
577 set())
578
579 # Look in each label associated with atomic_group until we find one with
580 # enough hosts to satisfy the job.
581 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
582 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
583 if queue_entry.meta_host is not None:
584 # If we have a metahost label, only allow its hosts.
585 group_hosts.intersection_update(hosts_in_label)
586 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000587 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000588 group_hosts, queue_entry)
589
590 # Job.synch_count is treated as "minimum synch count" when
591 # scheduling for an atomic group of hosts. The atomic group
592 # number of machines is the maximum to pick out of a single
593 # atomic group label for scheduling at one time.
594 min_hosts = job.synch_count
595 max_hosts = atomic_group.max_number_of_machines
596
showard54c1ea92009-05-20 00:32:58 +0000597 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000598 # Not enough eligible hosts in this atomic group label.
599 continue
600
showard54c1ea92009-05-20 00:32:58 +0000601 eligible_hosts_in_group = [self._hosts_available[id]
602 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000603 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000604 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000605
showard89f84db2009-03-12 20:39:13 +0000606 # Limit ourselves to scheduling the atomic group size.
607 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000608 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000609
610 # Remove the selected hosts from our cached internal state
611 # of available hosts in order to return the Host objects.
612 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000613 for host in eligible_hosts_in_group:
614 hosts_in_label.discard(host.id)
615 self._hosts_available.pop(host.id)
616 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000617 return host_list
618
619 return []
620
621
showard170873e2009-01-07 00:22:26 +0000622class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000623 def __init__(self):
624 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000625 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000626 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000627 user_cleanup_time = scheduler_config.config.clean_interval
628 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
629 _db, user_cleanup_time)
630 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000631 self._host_agents = {}
632 self._queue_entry_agents = {}
showardf13a9e22009-12-18 22:54:09 +0000633 self._tick_count = 0
634 self._last_garbage_stats_time = time.time()
635 self._seconds_between_garbage_stats = 60 * (
636 global_config.global_config.get_config_value(
637 scheduler_config.CONFIG_SECTION,
638 'gc_stats_interval_mins', type=int, default=6*60))
mbligh36768f02008-02-22 18:28:33 +0000639
mbligh36768f02008-02-22 18:28:33 +0000640
showard915958d2009-04-22 21:00:58 +0000641 def initialize(self, recover_hosts=True):
642 self._periodic_cleanup.initialize()
643 self._24hr_upkeep.initialize()
644
jadmanski0afbb632008-06-06 21:10:57 +0000645 # always recover processes
646 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000647
jadmanski0afbb632008-06-06 21:10:57 +0000648 if recover_hosts:
649 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000650
651
jadmanski0afbb632008-06-06 21:10:57 +0000652 def tick(self):
showardf13a9e22009-12-18 22:54:09 +0000653 self._garbage_collection()
showard170873e2009-01-07 00:22:26 +0000654 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000655 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000656 self._find_aborting()
showard29f7cd22009-04-29 21:16:24 +0000657 self._process_recurring_runs()
showard8cc058f2009-09-08 16:26:33 +0000658 self._schedule_delay_tasks()
showard8cc058f2009-09-08 16:26:33 +0000659 self._schedule_running_host_queue_entries()
660 self._schedule_special_tasks()
showard65db3932009-10-28 19:54:35 +0000661 self._schedule_new_jobs()
jadmanski0afbb632008-06-06 21:10:57 +0000662 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000663 _drone_manager.execute_actions()
664 email_manager.manager.send_queued_emails()
showardf13a9e22009-12-18 22:54:09 +0000665 self._tick_count += 1
mbligh36768f02008-02-22 18:28:33 +0000666
showard97aed502008-11-04 02:01:24 +0000667
mblighf3294cc2009-04-08 21:17:38 +0000668 def _run_cleanup(self):
669 self._periodic_cleanup.run_cleanup_maybe()
670 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000671
mbligh36768f02008-02-22 18:28:33 +0000672
showardf13a9e22009-12-18 22:54:09 +0000673 def _garbage_collection(self):
674 threshold_time = time.time() - self._seconds_between_garbage_stats
675 if threshold_time < self._last_garbage_stats_time:
676 # Don't generate these reports very often.
677 return
678
679 self._last_garbage_stats_time = time.time()
680 # Force a full level 0 collection (because we can, it doesn't hurt
681 # at this interval).
682 gc.collect()
683 logging.info('Logging garbage collector stats on tick %d.',
684 self._tick_count)
685 gc_stats._log_garbage_collector_stats()
686
687
showard170873e2009-01-07 00:22:26 +0000688 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
689 for object_id in object_ids:
690 agent_dict.setdefault(object_id, set()).add(agent)
691
692
693 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
694 for object_id in object_ids:
695 assert object_id in agent_dict
696 agent_dict[object_id].remove(agent)
697
698
showardd1195652009-12-08 22:21:02 +0000699 def add_agent_task(self, agent_task):
700 agent = Agent(agent_task)
jadmanski0afbb632008-06-06 21:10:57 +0000701 self._agents.append(agent)
702 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000703 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
704 self._register_agent_for_ids(self._queue_entry_agents,
705 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000706
showard170873e2009-01-07 00:22:26 +0000707
708 def get_agents_for_entry(self, queue_entry):
709 """
710 Find agents corresponding to the specified queue_entry.
711 """
showardd3dc1992009-04-22 21:01:40 +0000712 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000713
714
715 def host_has_agent(self, host):
716 """
717 Determine if there is currently an Agent present using this host.
718 """
719 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000720
721
jadmanski0afbb632008-06-06 21:10:57 +0000722 def remove_agent(self, agent):
723 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000724 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
725 agent)
726 self._unregister_agent_for_ids(self._queue_entry_agents,
727 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000728
729
showard8cc058f2009-09-08 16:26:33 +0000730 def _host_has_scheduled_special_task(self, host):
731 return bool(models.SpecialTask.objects.filter(host__id=host.id,
732 is_active=False,
733 is_complete=False))
734
735
jadmanski0afbb632008-06-06 21:10:57 +0000736 def _recover_processes(self):
showardd1195652009-12-08 22:21:02 +0000737 agent_tasks = self._create_recovery_agent_tasks()
738 self._register_pidfiles(agent_tasks)
showard170873e2009-01-07 00:22:26 +0000739 _drone_manager.refresh()
showardd1195652009-12-08 22:21:02 +0000740 self._recover_tasks(agent_tasks)
showard8cc058f2009-09-08 16:26:33 +0000741 self._recover_pending_entries()
showardb8900452009-10-12 20:31:01 +0000742 self._check_for_unrecovered_verifying_entries()
showard170873e2009-01-07 00:22:26 +0000743 self._reverify_remaining_hosts()
744 # reinitialize drones after killing orphaned processes, since they can
745 # leave around files when they die
746 _drone_manager.execute_actions()
747 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000748
showard170873e2009-01-07 00:22:26 +0000749
showardd1195652009-12-08 22:21:02 +0000750 def _create_recovery_agent_tasks(self):
751 return (self._get_queue_entry_agent_tasks()
752 + self._get_special_task_agent_tasks(is_active=True))
753
754
755 def _get_queue_entry_agent_tasks(self):
756 # host queue entry statuses handled directly by AgentTasks (Verifying is
757 # handled through SpecialTasks, so is not listed here)
758 statuses = (models.HostQueueEntry.Status.STARTING,
759 models.HostQueueEntry.Status.RUNNING,
760 models.HostQueueEntry.Status.GATHERING,
761 models.HostQueueEntry.Status.PARSING)
762 status_list = ','.join("'%s'" % status for status in statuses)
showard170873e2009-01-07 00:22:26 +0000763 queue_entries = HostQueueEntry.fetch(
showardd1195652009-12-08 22:21:02 +0000764 where='status IN (%s)' % status_list)
765
766 agent_tasks = []
767 used_queue_entries = set()
768 for entry in queue_entries:
769 if self.get_agents_for_entry(entry):
770 # already being handled
771 continue
772 if entry in used_queue_entries:
773 # already picked up by a synchronous job
774 continue
775 agent_task = self._get_agent_task_for_queue_entry(entry)
776 agent_tasks.append(agent_task)
777 used_queue_entries.update(agent_task.queue_entries)
778 return agent_tasks
showard170873e2009-01-07 00:22:26 +0000779
780
showardd1195652009-12-08 22:21:02 +0000781 def _get_special_task_agent_tasks(self, is_active=False):
782 special_tasks = models.SpecialTask.objects.filter(
783 is_active=is_active, is_complete=False)
784 return [self._get_agent_task_for_special_task(task)
785 for task in special_tasks]
786
787
788 def _get_agent_task_for_queue_entry(self, queue_entry):
789 """
790 Construct an AgentTask instance for the given active HostQueueEntry,
791 if one can currently run it.
792 @param queue_entry: a HostQueueEntry
793 @returns an AgentTask to run the queue entry
794 """
795 task_entries = queue_entry.job.get_group_entries(queue_entry)
796 self._check_for_duplicate_host_entries(task_entries)
797
798 if queue_entry.status in (models.HostQueueEntry.Status.STARTING,
799 models.HostQueueEntry.Status.RUNNING):
showarda9545c02009-12-18 22:44:26 +0000800 if queue_entry.is_hostless():
801 return HostlessQueueTask(queue_entry=queue_entry)
showardd1195652009-12-08 22:21:02 +0000802 return QueueTask(queue_entries=task_entries)
803 if queue_entry.status == models.HostQueueEntry.Status.GATHERING:
804 return GatherLogsTask(queue_entries=task_entries)
805 if queue_entry.status == models.HostQueueEntry.Status.PARSING:
806 return FinalReparseTask(queue_entries=task_entries)
807
808 raise SchedulerError('_get_agent_task_for_queue_entry got entry with '
809 'invalid status %s: %s' % (entry.status, entry))
810
811
812 def _check_for_duplicate_host_entries(self, task_entries):
showarda9545c02009-12-18 22:44:26 +0000813 parsing_status = models.HostQueueEntry.Status.PARSING
showardd1195652009-12-08 22:21:02 +0000814 for task_entry in task_entries:
showarda9545c02009-12-18 22:44:26 +0000815 using_host = (task_entry.host is not None
816 and task_entry.status != parsing_status)
817 if using_host:
showardd1195652009-12-08 22:21:02 +0000818 self._assert_host_has_no_agent(task_entry)
819
820
821 def _assert_host_has_no_agent(self, entry):
822 """
823 @param entry: a HostQueueEntry or a SpecialTask
824 """
825 if self.host_has_agent(entry.host):
826 agent = tuple(self._host_agents.get(entry.host.id))[0]
827 raise SchedulerError(
828 'While scheduling %s, host %s already has a host agent %s'
829 % (entry, entry.host, agent.task))
830
831
832 def _get_agent_task_for_special_task(self, special_task):
833 """
834 Construct an AgentTask class to run the given SpecialTask and add it
835 to this dispatcher.
836 @param special_task: a models.SpecialTask instance
837 @returns an AgentTask to run this SpecialTask
838 """
839 self._assert_host_has_no_agent(special_task)
840
841 special_agent_task_classes = (CleanupTask, VerifyTask, RepairTask)
842 for agent_task_class in special_agent_task_classes:
843 if agent_task_class.TASK_TYPE == special_task.task:
844 return agent_task_class(task=special_task)
845
846 raise SchedulerError('No AgentTask class for task', str(special_task))
847
848
849 def _register_pidfiles(self, agent_tasks):
850 for agent_task in agent_tasks:
851 agent_task.register_necessary_pidfiles()
852
853
854 def _recover_tasks(self, agent_tasks):
855 orphans = _drone_manager.get_orphaned_autoserv_processes()
856
857 for agent_task in agent_tasks:
858 agent_task.recover()
859 if agent_task.monitor and agent_task.monitor.has_process():
860 orphans.discard(agent_task.monitor.get_process())
861 self.add_agent_task(agent_task)
862
863 self._check_for_remaining_orphan_processes(orphans)
showarded2afea2009-07-07 20:54:07 +0000864
865
showard8cc058f2009-09-08 16:26:33 +0000866 def _get_unassigned_entries(self, status):
867 for entry in HostQueueEntry.fetch(where="status = '%s'" % status):
showard0db3d432009-10-12 20:29:15 +0000868 if entry.status == status and not self.get_agents_for_entry(entry):
869 # The status can change during iteration, e.g., if job.run()
870 # sets a group of queue entries to Starting
showard8cc058f2009-09-08 16:26:33 +0000871 yield entry
872
873
showard6878e8b2009-07-20 22:37:45 +0000874 def _check_for_remaining_orphan_processes(self, orphans):
875 if not orphans:
876 return
877 subject = 'Unrecovered orphan autoserv processes remain'
878 message = '\n'.join(str(process) for process in orphans)
879 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000880
881 die_on_orphans = global_config.global_config.get_config_value(
882 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
883
884 if die_on_orphans:
885 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000886
showard170873e2009-01-07 00:22:26 +0000887
showard8cc058f2009-09-08 16:26:33 +0000888 def _recover_pending_entries(self):
889 for entry in self._get_unassigned_entries(
890 models.HostQueueEntry.Status.PENDING):
showard56824072009-10-12 20:30:21 +0000891 logging.info('Recovering Pending entry %s', entry)
showard8cc058f2009-09-08 16:26:33 +0000892 entry.on_pending()
893
894
showardb8900452009-10-12 20:31:01 +0000895 def _check_for_unrecovered_verifying_entries(self):
showard170873e2009-01-07 00:22:26 +0000896 queue_entries = HostQueueEntry.fetch(
showardb8900452009-10-12 20:31:01 +0000897 where='status = "%s"' % models.HostQueueEntry.Status.VERIFYING)
898 unrecovered_hqes = []
899 for queue_entry in queue_entries:
900 special_tasks = models.SpecialTask.objects.filter(
901 task__in=(models.SpecialTask.Task.CLEANUP,
902 models.SpecialTask.Task.VERIFY),
903 queue_entry__id=queue_entry.id,
904 is_complete=False)
905 if special_tasks.count() == 0:
906 unrecovered_hqes.append(queue_entry)
showardd3dc1992009-04-22 21:01:40 +0000907
showardb8900452009-10-12 20:31:01 +0000908 if unrecovered_hqes:
909 message = '\n'.join(str(hqe) for hqe in unrecovered_hqes)
showarde8e37072009-08-20 23:31:30 +0000910 raise SchedulerError(
showard37757f32009-10-19 18:34:24 +0000911 '%d unrecovered verifying host queue entries:\n%s' %
showardb8900452009-10-12 20:31:01 +0000912 (len(unrecovered_hqes), message))
showard170873e2009-01-07 00:22:26 +0000913
914
showard65db3932009-10-28 19:54:35 +0000915 def _get_prioritized_special_tasks(self):
916 """
917 Returns all queued SpecialTasks prioritized for repair first, then
918 cleanup, then verify.
919 """
920 queued_tasks = models.SpecialTask.objects.filter(is_active=False,
921 is_complete=False,
922 host__locked=False)
923 # exclude hosts with active queue entries unless the SpecialTask is for
924 # that queue entry
925 queued_tasks = models.Host.objects.add_join(
926 queued_tasks, 'host_queue_entries', 'host_id',
927 join_condition='host_queue_entries.active',
928 force_left_join=True)
929 queued_tasks = queued_tasks.extra(
930 where=['(host_queue_entries.id IS NULL OR '
931 'host_queue_entries.id = special_tasks.queue_entry_id)'])
showard6d7b2ff2009-06-10 00:16:47 +0000932
showard65db3932009-10-28 19:54:35 +0000933 # reorder tasks by priority
934 task_priority_order = [models.SpecialTask.Task.REPAIR,
935 models.SpecialTask.Task.CLEANUP,
936 models.SpecialTask.Task.VERIFY]
937 def task_priority_key(task):
938 return task_priority_order.index(task.task)
939 return sorted(queued_tasks, key=task_priority_key)
940
941
showard65db3932009-10-28 19:54:35 +0000942 def _schedule_special_tasks(self):
943 """
944 Execute queued SpecialTasks that are ready to run on idle hosts.
945 """
946 for task in self._get_prioritized_special_tasks():
showard8cc058f2009-09-08 16:26:33 +0000947 if self.host_has_agent(task.host):
showard2fe3f1d2009-07-06 20:19:11 +0000948 continue
showardd1195652009-12-08 22:21:02 +0000949 self.add_agent_task(self._get_agent_task_for_special_task(task))
showard1ff7b2e2009-05-15 23:17:18 +0000950
951
showard170873e2009-01-07 00:22:26 +0000952 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000953 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000954 # should never happen
showarded2afea2009-07-07 20:54:07 +0000955 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000956 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000957 self._reverify_hosts_where(
showard8cc058f2009-09-08 16:26:33 +0000958 "status IN ('Repairing', 'Verifying', 'Cleaning')",
showarded2afea2009-07-07 20:54:07 +0000959 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000960
961
jadmanski0afbb632008-06-06 21:10:57 +0000962 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000963 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000964 full_where='locked = 0 AND invalid = 0 AND ' + where
965 for host in Host.fetch(where=full_where):
966 if self.host_has_agent(host):
967 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000968 continue
showard8cc058f2009-09-08 16:26:33 +0000969 if self._host_has_scheduled_special_task(host):
970 # host will have a special task scheduled on the next cycle
971 continue
showard170873e2009-01-07 00:22:26 +0000972 if print_message:
showardb18134f2009-03-20 20:52:18 +0000973 logging.info(print_message, host.hostname)
showard8cc058f2009-09-08 16:26:33 +0000974 models.SpecialTask.objects.create(
975 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +0000976 host=models.Host.objects.get(id=host.id))
mbligh36768f02008-02-22 18:28:33 +0000977
978
jadmanski0afbb632008-06-06 21:10:57 +0000979 def _recover_hosts(self):
980 # recover "Repair Failed" hosts
981 message = 'Reverifying dead host %s'
982 self._reverify_hosts_where("status = 'Repair Failed'",
983 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000984
985
showard04c82c52008-05-29 19:38:12 +0000986
showardb95b1bd2008-08-15 18:11:04 +0000987 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000988 # prioritize by job priority, then non-metahost over metahost, then FIFO
989 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000990 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000991 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000992 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000993
994
showard89f84db2009-03-12 20:39:13 +0000995 def _refresh_pending_queue_entries(self):
996 """
997 Lookup the pending HostQueueEntries and call our HostScheduler
998 refresh() method given that list. Return the list.
999
1000 @returns A list of pending HostQueueEntries sorted in priority order.
1001 """
showard63a34772008-08-18 19:32:50 +00001002 queue_entries = self._get_pending_queue_entries()
1003 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +00001004 return []
showardb95b1bd2008-08-15 18:11:04 +00001005
showard63a34772008-08-18 19:32:50 +00001006 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +00001007
showard89f84db2009-03-12 20:39:13 +00001008 return queue_entries
1009
1010
1011 def _schedule_atomic_group(self, queue_entry):
1012 """
1013 Schedule the given queue_entry on an atomic group of hosts.
1014
1015 Returns immediately if there are insufficient available hosts.
1016
1017 Creates new HostQueueEntries based off of queue_entry for the
1018 scheduled hosts and starts them all running.
1019 """
1020 # This is a virtual host queue entry representing an entire
1021 # atomic group, find a group and schedule their hosts.
1022 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1023 queue_entry)
1024 if not group_hosts:
1025 return
showardcbe6f942009-06-17 19:33:49 +00001026
1027 logging.info('Expanding atomic group entry %s with hosts %s',
1028 queue_entry,
1029 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001030 # The first assigned host uses the original HostQueueEntry
1031 group_queue_entries = [queue_entry]
1032 for assigned_host in group_hosts[1:]:
1033 # Create a new HQE for every additional assigned_host.
1034 new_hqe = HostQueueEntry.clone(queue_entry)
1035 new_hqe.save()
1036 group_queue_entries.append(new_hqe)
1037 assert len(group_queue_entries) == len(group_hosts)
1038 for queue_entry, host in itertools.izip(group_queue_entries,
1039 group_hosts):
1040 self._run_queue_entry(queue_entry, host)
1041
1042
showarda9545c02009-12-18 22:44:26 +00001043 def _schedule_hostless_job(self, queue_entry):
1044 self.add_agent_task(HostlessQueueTask(queue_entry))
1045
1046
showard89f84db2009-03-12 20:39:13 +00001047 def _schedule_new_jobs(self):
1048 queue_entries = self._refresh_pending_queue_entries()
1049 if not queue_entries:
1050 return
1051
showard63a34772008-08-18 19:32:50 +00001052 for queue_entry in queue_entries:
showarde55955f2009-10-07 20:48:58 +00001053 is_unassigned_atomic_group = (
1054 queue_entry.atomic_group_id is not None
1055 and queue_entry.host_id is None)
1056 if is_unassigned_atomic_group:
1057 self._schedule_atomic_group(queue_entry)
showarda9545c02009-12-18 22:44:26 +00001058 elif queue_entry.is_hostless():
1059 self._schedule_hostless_job(queue_entry)
showarde55955f2009-10-07 20:48:58 +00001060 else:
showard89f84db2009-03-12 20:39:13 +00001061 assigned_host = self._host_scheduler.find_eligible_host(
1062 queue_entry)
showard65db3932009-10-28 19:54:35 +00001063 if assigned_host and not self.host_has_agent(assigned_host):
showard89f84db2009-03-12 20:39:13 +00001064 self._run_queue_entry(queue_entry, assigned_host)
showardb95b1bd2008-08-15 18:11:04 +00001065
1066
showard8cc058f2009-09-08 16:26:33 +00001067 def _schedule_running_host_queue_entries(self):
showardd1195652009-12-08 22:21:02 +00001068 for agent_task in self._get_queue_entry_agent_tasks():
1069 self.add_agent_task(agent_task)
showard8cc058f2009-09-08 16:26:33 +00001070
1071
1072 def _schedule_delay_tasks(self):
showardd2014822009-10-12 20:26:58 +00001073 for entry in HostQueueEntry.fetch(where='status = "%s"' %
1074 models.HostQueueEntry.Status.WAITING):
showard8cc058f2009-09-08 16:26:33 +00001075 task = entry.job.schedule_delayed_callback_task(entry)
1076 if task:
showardd1195652009-12-08 22:21:02 +00001077 self.add_agent_task(task)
showard8cc058f2009-09-08 16:26:33 +00001078
1079
showardb95b1bd2008-08-15 18:11:04 +00001080 def _run_queue_entry(self, queue_entry, host):
showard8cc058f2009-09-08 16:26:33 +00001081 queue_entry.schedule_pre_job_tasks(assigned_host=host)
mblighd5c95802008-03-05 00:33:46 +00001082
1083
jadmanski0afbb632008-06-06 21:10:57 +00001084 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001085 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001086 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001087 for agent in self.get_agents_for_entry(entry):
1088 agent.abort()
1089 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001090
1091
showard324bf812009-01-20 23:23:38 +00001092 def _can_start_agent(self, agent, num_started_this_cycle,
1093 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001094 # always allow zero-process agents to run
showardd1195652009-12-08 22:21:02 +00001095 if agent.task.num_processes == 0:
showard4c5374f2008-09-04 17:02:56 +00001096 return True
1097 # don't allow any nonzero-process agents to run after we've reached a
1098 # limit (this avoids starvation of many-process agents)
1099 if have_reached_limit:
1100 return False
1101 # total process throttling
showard9bb960b2009-11-19 01:02:11 +00001102 max_runnable_processes = _drone_manager.max_runnable_processes(
showardd1195652009-12-08 22:21:02 +00001103 agent.task.owner_username)
1104 if agent.task.num_processes > max_runnable_processes:
showard4c5374f2008-09-04 17:02:56 +00001105 return False
1106 # if a single agent exceeds the per-cycle throttling, still allow it to
1107 # run when it's the first agent in the cycle
1108 if num_started_this_cycle == 0:
1109 return True
1110 # per-cycle throttling
showardd1195652009-12-08 22:21:02 +00001111 if (num_started_this_cycle + agent.task.num_processes >
1112 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001113 return False
1114 return True
1115
1116
jadmanski0afbb632008-06-06 21:10:57 +00001117 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001118 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001119 have_reached_limit = False
1120 # iterate over copy, so we can remove agents during iteration
1121 for agent in list(self._agents):
showard8cc058f2009-09-08 16:26:33 +00001122 if not agent.started:
showard324bf812009-01-20 23:23:38 +00001123 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001124 have_reached_limit):
1125 have_reached_limit = True
1126 continue
showardd1195652009-12-08 22:21:02 +00001127 num_started_this_cycle += agent.task.num_processes
showard4c5374f2008-09-04 17:02:56 +00001128 agent.tick()
showard8cc058f2009-09-08 16:26:33 +00001129 if agent.is_done():
1130 logging.info("agent finished")
1131 self.remove_agent(agent)
showarda9435c02009-05-13 21:28:17 +00001132 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001133 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001134
1135
showard29f7cd22009-04-29 21:16:24 +00001136 def _process_recurring_runs(self):
1137 recurring_runs = models.RecurringRun.objects.filter(
1138 start_date__lte=datetime.datetime.now())
1139 for rrun in recurring_runs:
1140 # Create job from template
1141 job = rrun.job
1142 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001143 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001144
1145 host_objects = info['hosts']
1146 one_time_hosts = info['one_time_hosts']
1147 metahost_objects = info['meta_hosts']
1148 dependencies = info['dependencies']
1149 atomic_group = info['atomic_group']
1150
1151 for host in one_time_hosts or []:
1152 this_host = models.Host.create_one_time_host(host.hostname)
1153 host_objects.append(this_host)
1154
1155 try:
1156 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001157 options=options,
showard29f7cd22009-04-29 21:16:24 +00001158 host_objects=host_objects,
1159 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001160 atomic_group=atomic_group)
1161
1162 except Exception, ex:
1163 logging.exception(ex)
1164 #TODO send email
1165
1166 if rrun.loop_count == 1:
1167 rrun.delete()
1168 else:
1169 if rrun.loop_count != 0: # if not infinite loop
1170 # calculate new start_date
1171 difference = datetime.timedelta(seconds=rrun.loop_period)
1172 rrun.start_date = rrun.start_date + difference
1173 rrun.loop_count -= 1
1174 rrun.save()
1175
1176
showard170873e2009-01-07 00:22:26 +00001177class PidfileRunMonitor(object):
1178 """
1179 Client must call either run() to start a new process or
1180 attach_to_existing_process().
1181 """
mbligh36768f02008-02-22 18:28:33 +00001182
showard170873e2009-01-07 00:22:26 +00001183 class _PidfileException(Exception):
1184 """
1185 Raised when there's some unexpected behavior with the pid file, but only
1186 used internally (never allowed to escape this class).
1187 """
mbligh36768f02008-02-22 18:28:33 +00001188
1189
showard170873e2009-01-07 00:22:26 +00001190 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001191 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001192 self._start_time = None
1193 self.pidfile_id = None
1194 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001195
1196
showard170873e2009-01-07 00:22:26 +00001197 def _add_nice_command(self, command, nice_level):
1198 if not nice_level:
1199 return command
1200 return ['nice', '-n', str(nice_level)] + command
1201
1202
1203 def _set_start_time(self):
1204 self._start_time = time.time()
1205
1206
showard418785b2009-11-23 20:19:59 +00001207 def run(self, command, working_directory, num_processes, nice_level=None,
1208 log_file=None, pidfile_name=None, paired_with_pidfile=None,
1209 username=None):
showard170873e2009-01-07 00:22:26 +00001210 assert command is not None
1211 if nice_level is not None:
1212 command = ['nice', '-n', str(nice_level)] + command
1213 self._set_start_time()
1214 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001215 command, working_directory, pidfile_name=pidfile_name,
showard418785b2009-11-23 20:19:59 +00001216 num_processes=num_processes, log_file=log_file,
1217 paired_with_pidfile=paired_with_pidfile, username=username)
showard170873e2009-01-07 00:22:26 +00001218
1219
showarded2afea2009-07-07 20:54:07 +00001220 def attach_to_existing_process(self, execution_path,
showardd1195652009-12-08 22:21:02 +00001221 pidfile_name=_AUTOSERV_PID_FILE,
1222 num_processes=None):
showard170873e2009-01-07 00:22:26 +00001223 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001224 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001225 execution_path, pidfile_name=pidfile_name)
showardd1195652009-12-08 22:21:02 +00001226 if num_processes is not None:
1227 _drone_manager.declare_process_count(self.pidfile_id, num_processes)
mblighbb421852008-03-11 22:36:16 +00001228
1229
jadmanski0afbb632008-06-06 21:10:57 +00001230 def kill(self):
showard170873e2009-01-07 00:22:26 +00001231 if self.has_process():
1232 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001233
mbligh36768f02008-02-22 18:28:33 +00001234
showard170873e2009-01-07 00:22:26 +00001235 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001236 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001237 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001238
1239
showard170873e2009-01-07 00:22:26 +00001240 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001241 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001242 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001243 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001244
1245
showard170873e2009-01-07 00:22:26 +00001246 def _read_pidfile(self, use_second_read=False):
1247 assert self.pidfile_id is not None, (
1248 'You must call run() or attach_to_existing_process()')
1249 contents = _drone_manager.get_pidfile_contents(
1250 self.pidfile_id, use_second_read=use_second_read)
1251 if contents.is_invalid():
1252 self._state = drone_manager.PidfileContents()
1253 raise self._PidfileException(contents)
1254 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001255
1256
showard21baa452008-10-21 00:08:39 +00001257 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001258 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1259 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001260 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001261 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001262
1263
1264 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001265 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001266 return
mblighbb421852008-03-11 22:36:16 +00001267
showard21baa452008-10-21 00:08:39 +00001268 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001269
showard170873e2009-01-07 00:22:26 +00001270 if self._state.process is None:
1271 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001272 return
mbligh90a549d2008-03-25 23:52:34 +00001273
showard21baa452008-10-21 00:08:39 +00001274 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001275 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001276 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001277 return
mbligh90a549d2008-03-25 23:52:34 +00001278
showard170873e2009-01-07 00:22:26 +00001279 # pid but no running process - maybe process *just* exited
1280 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001281 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001282 # autoserv exited without writing an exit code
1283 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001284 self._handle_pidfile_error(
1285 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001286
showard21baa452008-10-21 00:08:39 +00001287
1288 def _get_pidfile_info(self):
1289 """\
1290 After completion, self._state will contain:
1291 pid=None, exit_status=None if autoserv has not yet run
1292 pid!=None, exit_status=None if autoserv is running
1293 pid!=None, exit_status!=None if autoserv has completed
1294 """
1295 try:
1296 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001297 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001298 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001299
1300
showard170873e2009-01-07 00:22:26 +00001301 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001302 """\
1303 Called when no pidfile is found or no pid is in the pidfile.
1304 """
showard170873e2009-01-07 00:22:26 +00001305 message = 'No pid found at %s' % self.pidfile_id
showardec6a3b92009-09-25 20:29:13 +00001306 if time.time() - self._start_time > _get_pidfile_timeout_secs():
showard170873e2009-01-07 00:22:26 +00001307 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001308 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001309 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001310
1311
showard35162b02009-03-03 02:17:30 +00001312 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001313 """\
1314 Called when autoserv has exited without writing an exit status,
1315 or we've timed out waiting for autoserv to write a pid to the
1316 pidfile. In either case, we just return failure and the caller
1317 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001318
showard170873e2009-01-07 00:22:26 +00001319 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001320 """
1321 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001322 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001323 self._state.exit_status = 1
1324 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001325
1326
jadmanski0afbb632008-06-06 21:10:57 +00001327 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001328 self._get_pidfile_info()
1329 return self._state.exit_status
1330
1331
1332 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001333 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001334 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001335 if self._state.num_tests_failed is None:
1336 return -1
showard21baa452008-10-21 00:08:39 +00001337 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001338
1339
showardcdaeae82009-08-31 18:32:48 +00001340 def try_copy_results_on_drone(self, **kwargs):
1341 if self.has_process():
1342 # copy results logs into the normal place for job results
1343 _drone_manager.copy_results_on_drone(self.get_process(), **kwargs)
1344
1345
1346 def try_copy_to_results_repository(self, source, **kwargs):
1347 if self.has_process():
1348 _drone_manager.copy_to_results_repository(self.get_process(),
1349 source, **kwargs)
1350
1351
mbligh36768f02008-02-22 18:28:33 +00001352class Agent(object):
showard77182562009-06-10 00:16:05 +00001353 """
showard8cc058f2009-09-08 16:26:33 +00001354 An agent for use by the Dispatcher class to perform a task.
showard77182562009-06-10 00:16:05 +00001355
1356 The following methods are required on all task objects:
1357 poll() - Called periodically to let the task check its status and
1358 update its internal state. If the task succeeded.
1359 is_done() - Returns True if the task is finished.
1360 abort() - Called when an abort has been requested. The task must
1361 set its aborted attribute to True if it actually aborted.
1362
1363 The following attributes are required on all task objects:
1364 aborted - bool, True if this task was aborted.
showard77182562009-06-10 00:16:05 +00001365 success - bool, True if this task succeeded.
1366 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1367 host_ids - A sequence of Host ids this task represents.
showard77182562009-06-10 00:16:05 +00001368 """
1369
1370
showard418785b2009-11-23 20:19:59 +00001371 def __init__(self, task):
showard77182562009-06-10 00:16:05 +00001372 """
showard8cc058f2009-09-08 16:26:33 +00001373 @param task: A task as described in the class docstring.
showard77182562009-06-10 00:16:05 +00001374 """
showard8cc058f2009-09-08 16:26:33 +00001375 self.task = task
showard8cc058f2009-09-08 16:26:33 +00001376
showard77182562009-06-10 00:16:05 +00001377 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001378 self.dispatcher = None
jadmanski0afbb632008-06-06 21:10:57 +00001379
showard8cc058f2009-09-08 16:26:33 +00001380 self.queue_entry_ids = task.queue_entry_ids
1381 self.host_ids = task.host_ids
showard170873e2009-01-07 00:22:26 +00001382
showard8cc058f2009-09-08 16:26:33 +00001383 self.started = False
showard9bb960b2009-11-19 01:02:11 +00001384 self.finished = False
mbligh36768f02008-02-22 18:28:33 +00001385
1386
jadmanski0afbb632008-06-06 21:10:57 +00001387 def tick(self):
showard8cc058f2009-09-08 16:26:33 +00001388 self.started = True
showard9bb960b2009-11-19 01:02:11 +00001389 if not self.finished:
showard8cc058f2009-09-08 16:26:33 +00001390 self.task.poll()
1391 if self.task.is_done():
showard9bb960b2009-11-19 01:02:11 +00001392 self.finished = True
showardec113162008-05-08 00:52:49 +00001393
1394
jadmanski0afbb632008-06-06 21:10:57 +00001395 def is_done(self):
showard9bb960b2009-11-19 01:02:11 +00001396 return self.finished
mbligh36768f02008-02-22 18:28:33 +00001397
1398
showardd3dc1992009-04-22 21:01:40 +00001399 def abort(self):
showard8cc058f2009-09-08 16:26:33 +00001400 if self.task:
1401 self.task.abort()
1402 if self.task.aborted:
showard08a36412009-05-05 01:01:13 +00001403 # tasks can choose to ignore aborts
showard9bb960b2009-11-19 01:02:11 +00001404 self.finished = True
showard20f9bdd2009-04-29 19:48:33 +00001405
showardd3dc1992009-04-22 21:01:40 +00001406
showard77182562009-06-10 00:16:05 +00001407class DelayedCallTask(object):
1408 """
1409 A task object like AgentTask for an Agent to run that waits for the
1410 specified amount of time to have elapsed before calling the supplied
1411 callback once and finishing. If the callback returns anything, it is
1412 assumed to be a new Agent instance and will be added to the dispatcher.
1413
1414 @attribute end_time: The absolute posix time after which this task will
1415 call its callback when it is polled and be finished.
1416
1417 Also has all attributes required by the Agent class.
1418 """
1419 def __init__(self, delay_seconds, callback, now_func=None):
1420 """
1421 @param delay_seconds: The delay in seconds from now that this task
1422 will call the supplied callback and be done.
1423 @param callback: A callable to be called by this task once after at
1424 least delay_seconds time has elapsed. It must return None
1425 or a new Agent instance.
1426 @param now_func: A time.time like function. Default: time.time.
1427 Used for testing.
1428 """
1429 assert delay_seconds > 0
1430 assert callable(callback)
1431 if not now_func:
1432 now_func = time.time
1433 self._now_func = now_func
1434 self._callback = callback
1435
1436 self.end_time = self._now_func() + delay_seconds
1437
1438 # These attributes are required by Agent.
1439 self.aborted = False
showard77182562009-06-10 00:16:05 +00001440 self.host_ids = ()
1441 self.success = False
1442 self.queue_entry_ids = ()
showard418785b2009-11-23 20:19:59 +00001443 self.num_processes = 0
showard77182562009-06-10 00:16:05 +00001444
1445
1446 def poll(self):
showard8cc058f2009-09-08 16:26:33 +00001447 if not self.is_done() and self._now_func() >= self.end_time:
1448 self._callback()
showard77182562009-06-10 00:16:05 +00001449 self.success = True
1450
1451
1452 def is_done(self):
showard8cc058f2009-09-08 16:26:33 +00001453 return self.success or self.aborted
showard77182562009-06-10 00:16:05 +00001454
1455
1456 def abort(self):
1457 self.aborted = True
showard77182562009-06-10 00:16:05 +00001458
1459
mbligh36768f02008-02-22 18:28:33 +00001460class AgentTask(object):
showardd1195652009-12-08 22:21:02 +00001461 class _NullMonitor(object):
1462 pidfile_id = None
1463
1464 def has_process(self):
1465 return True
1466
1467
1468 def __init__(self, log_file_name=None):
showard9bb960b2009-11-19 01:02:11 +00001469 """
showardd1195652009-12-08 22:21:02 +00001470 @param log_file_name: (optional) name of file to log command output to
showard9bb960b2009-11-19 01:02:11 +00001471 """
jadmanski0afbb632008-06-06 21:10:57 +00001472 self.done = False
showardd1195652009-12-08 22:21:02 +00001473 self.started = False
jadmanski0afbb632008-06-06 21:10:57 +00001474 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001475 self.aborted = False
showardd1195652009-12-08 22:21:02 +00001476 self.monitor = None
showard170873e2009-01-07 00:22:26 +00001477 self.queue_entry_ids = []
1478 self.host_ids = []
showardd1195652009-12-08 22:21:02 +00001479 self._log_file_name = log_file_name
showard170873e2009-01-07 00:22:26 +00001480
1481
1482 def _set_ids(self, host=None, queue_entries=None):
1483 if queue_entries and queue_entries != [None]:
1484 self.host_ids = [entry.host.id for entry in queue_entries]
1485 self.queue_entry_ids = [entry.id for entry in queue_entries]
1486 else:
1487 assert host
1488 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001489
1490
jadmanski0afbb632008-06-06 21:10:57 +00001491 def poll(self):
showard08a36412009-05-05 01:01:13 +00001492 if not self.started:
1493 self.start()
showardd1195652009-12-08 22:21:02 +00001494 if not self.done:
1495 self.tick()
showard08a36412009-05-05 01:01:13 +00001496
1497
1498 def tick(self):
showardd1195652009-12-08 22:21:02 +00001499 assert self.monitor
1500 exit_code = self.monitor.exit_code()
1501 if exit_code is None:
1502 return
mbligh36768f02008-02-22 18:28:33 +00001503
showardd1195652009-12-08 22:21:02 +00001504 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001505 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001506
1507
jadmanski0afbb632008-06-06 21:10:57 +00001508 def is_done(self):
1509 return self.done
mbligh36768f02008-02-22 18:28:33 +00001510
1511
jadmanski0afbb632008-06-06 21:10:57 +00001512 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001513 if self.done:
showardd1195652009-12-08 22:21:02 +00001514 assert self.started
showard08a36412009-05-05 01:01:13 +00001515 return
showardd1195652009-12-08 22:21:02 +00001516 self.started = True
jadmanski0afbb632008-06-06 21:10:57 +00001517 self.done = True
1518 self.success = success
1519 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001520
1521
jadmanski0afbb632008-06-06 21:10:57 +00001522 def prolog(self):
showardd1195652009-12-08 22:21:02 +00001523 """
1524 To be overridden.
1525 """
showarded2afea2009-07-07 20:54:07 +00001526 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001527 self.register_necessary_pidfiles()
1528
1529
1530 def _log_file(self):
1531 if not self._log_file_name:
1532 return None
1533 return os.path.join(self._working_directory(), self._log_file_name)
mblighd64e5702008-04-04 21:39:28 +00001534
mbligh36768f02008-02-22 18:28:33 +00001535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def cleanup(self):
showardd1195652009-12-08 22:21:02 +00001537 log_file = self._log_file()
1538 if self.monitor and log_file:
1539 self.monitor.try_copy_to_results_repository(log_file)
mbligh36768f02008-02-22 18:28:33 +00001540
1541
jadmanski0afbb632008-06-06 21:10:57 +00001542 def epilog(self):
showardd1195652009-12-08 22:21:02 +00001543 """
1544 To be overridden.
1545 """
jadmanski0afbb632008-06-06 21:10:57 +00001546 self.cleanup()
showarda9545c02009-12-18 22:44:26 +00001547 logging.info("%s finished with success=%s", type(self).__name__,
1548 self.success)
1549
mbligh36768f02008-02-22 18:28:33 +00001550
1551
jadmanski0afbb632008-06-06 21:10:57 +00001552 def start(self):
jadmanski0afbb632008-06-06 21:10:57 +00001553 if not self.started:
1554 self.prolog()
1555 self.run()
1556
1557 self.started = True
1558
1559
1560 def abort(self):
1561 if self.monitor:
1562 self.monitor.kill()
1563 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001564 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001565 self.cleanup()
1566
1567
showarded2afea2009-07-07 20:54:07 +00001568 def _get_consistent_execution_path(self, execution_entries):
1569 first_execution_path = execution_entries[0].execution_path()
1570 for execution_entry in execution_entries[1:]:
1571 assert execution_entry.execution_path() == first_execution_path, (
1572 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1573 execution_entry,
1574 first_execution_path,
1575 execution_entries[0]))
1576 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001577
1578
showarded2afea2009-07-07 20:54:07 +00001579 def _copy_results(self, execution_entries, use_monitor=None):
1580 """
1581 @param execution_entries: list of objects with execution_path() method
1582 """
showard6d1c1432009-08-20 23:30:39 +00001583 if use_monitor is not None and not use_monitor.has_process():
1584 return
1585
showarded2afea2009-07-07 20:54:07 +00001586 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001587 if use_monitor is None:
1588 assert self.monitor
1589 use_monitor = self.monitor
1590 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001591 execution_path = self._get_consistent_execution_path(execution_entries)
1592 results_path = execution_path + '/'
showardcdaeae82009-08-31 18:32:48 +00001593 use_monitor.try_copy_to_results_repository(results_path)
showardde634ee2009-01-30 01:44:24 +00001594
showarda1e74b32009-05-12 17:32:04 +00001595
1596 def _parse_results(self, queue_entries):
showard8cc058f2009-09-08 16:26:33 +00001597 for queue_entry in queue_entries:
1598 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
showardde634ee2009-01-30 01:44:24 +00001599
1600
showarda1e74b32009-05-12 17:32:04 +00001601 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1602 self._copy_results(queue_entries, use_monitor)
1603 self._parse_results(queue_entries)
1604
1605
showardd1195652009-12-08 22:21:02 +00001606 def _command_line(self):
1607 """
1608 Return the command line to run. Must be overridden.
1609 """
1610 raise NotImplementedError
1611
1612
1613 @property
1614 def num_processes(self):
1615 """
1616 Return the number of processes forked by this AgentTask's process. It
1617 may only be approximate. To be overridden if necessary.
1618 """
1619 return 1
1620
1621
1622 def _paired_with_monitor(self):
1623 """
1624 If this AgentTask's process must run on the same machine as some
1625 previous process, this method should be overridden to return a
1626 PidfileRunMonitor for that process.
1627 """
1628 return self._NullMonitor()
1629
1630
1631 @property
1632 def owner_username(self):
1633 """
1634 Return login of user responsible for this task. May be None. Must be
1635 overridden.
1636 """
1637 raise NotImplementedError
1638
1639
1640 def _working_directory(self):
1641 """
1642 Return the directory where this AgentTask's process executes. Must be
1643 overridden.
1644 """
1645 raise NotImplementedError
1646
1647
1648 def _pidfile_name(self):
1649 """
1650 Return the name of the pidfile this AgentTask's process uses. To be
1651 overridden if necessary.
1652 """
1653 return _AUTOSERV_PID_FILE
1654
1655
1656 def _check_paired_results_exist(self):
1657 if not self._paired_with_monitor().has_process():
1658 email_manager.manager.enqueue_notify_email(
1659 'No paired results in task',
1660 'No paired results in task %s at %s'
1661 % (self, self._paired_with_monitor().pidfile_id))
1662 self.finished(False)
1663 return False
1664 return True
1665
1666
1667 def _create_monitor(self):
showarded2afea2009-07-07 20:54:07 +00001668 assert not self.monitor
showardd1195652009-12-08 22:21:02 +00001669 self.monitor = PidfileRunMonitor()
1670
1671
1672 def run(self):
1673 if not self._check_paired_results_exist():
1674 return
1675
1676 self._create_monitor()
1677 self.monitor.run(
1678 self._command_line(), self._working_directory(),
1679 num_processes=self.num_processes,
1680 nice_level=AUTOSERV_NICE_LEVEL, log_file=self._log_file(),
1681 pidfile_name=self._pidfile_name(),
1682 paired_with_pidfile=self._paired_with_monitor().pidfile_id,
1683 username=self.owner_username)
1684
1685
1686 def register_necessary_pidfiles(self):
1687 pidfile_id = _drone_manager.get_pidfile_id_from(
1688 self._working_directory(), self._pidfile_name())
1689 _drone_manager.register_pidfile(pidfile_id)
1690
1691 paired_pidfile_id = self._paired_with_monitor().pidfile_id
1692 if paired_pidfile_id:
1693 _drone_manager.register_pidfile(paired_pidfile_id)
1694
1695
1696 def recover(self):
1697 if not self._check_paired_results_exist():
1698 return
1699
1700 self._create_monitor()
1701 self.monitor.attach_to_existing_process(
1702 self._working_directory(), pidfile_name=self._pidfile_name(),
1703 num_processes=self.num_processes)
1704 if not self.monitor.has_process():
1705 # no process to recover; wait to be started normally
1706 self.monitor = None
1707 return
1708
1709 self.started = True
1710 logging.info('Recovering process %s for %s at %s'
1711 % (self.monitor.get_process(), type(self).__name__,
1712 self._working_directory()))
mbligh36768f02008-02-22 18:28:33 +00001713
1714
showardd9205182009-04-27 20:09:55 +00001715class TaskWithJobKeyvals(object):
1716 """AgentTask mixin providing functionality to help with job keyval files."""
1717 _KEYVAL_FILE = 'keyval'
1718 def _format_keyval(self, key, value):
1719 return '%s=%s' % (key, value)
1720
1721
1722 def _keyval_path(self):
1723 """Subclasses must override this"""
1724 raise NotImplemented
1725
1726
1727 def _write_keyval_after_job(self, field, value):
1728 assert self.monitor
1729 if not self.monitor.has_process():
1730 return
1731 _drone_manager.write_lines_to_file(
1732 self._keyval_path(), [self._format_keyval(field, value)],
1733 paired_with_process=self.monitor.get_process())
1734
1735
1736 def _job_queued_keyval(self, job):
1737 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1738
1739
1740 def _write_job_finished(self):
1741 self._write_keyval_after_job("job_finished", int(time.time()))
1742
1743
showarddb502762009-09-09 15:31:20 +00001744 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1745 keyval_contents = '\n'.join(self._format_keyval(key, value)
1746 for key, value in keyval_dict.iteritems())
1747 # always end with a newline to allow additional keyvals to be written
1748 keyval_contents += '\n'
showard493beaa2009-12-18 22:44:45 +00001749 _drone_manager.attach_file_to_execution(self._working_directory(),
showarddb502762009-09-09 15:31:20 +00001750 keyval_contents,
1751 file_path=keyval_path)
1752
1753
1754 def _write_keyvals_before_job(self, keyval_dict):
1755 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1756
1757
1758 def _write_host_keyvals(self, host):
showardd1195652009-12-08 22:21:02 +00001759 keyval_path = os.path.join(self._working_directory(), 'host_keyvals',
showarddb502762009-09-09 15:31:20 +00001760 host.hostname)
1761 platform, all_labels = host.platform_and_labels()
1762 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1763 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
1764
1765
showard8cc058f2009-09-08 16:26:33 +00001766class SpecialAgentTask(AgentTask, TaskWithJobKeyvals):
showarded2afea2009-07-07 20:54:07 +00001767 """
1768 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1769 """
1770
1771 TASK_TYPE = None
1772 host = None
1773 queue_entry = None
1774
showardd1195652009-12-08 22:21:02 +00001775 def __init__(self, task, extra_command_args):
1776 super(SpecialAgentTask, self).__init__()
1777
showarded2afea2009-07-07 20:54:07 +00001778 assert (self.TASK_TYPE is not None,
1779 'self.TASK_TYPE must be overridden')
showard8cc058f2009-09-08 16:26:33 +00001780
1781 self.host = Host(id=task.host.id)
1782 self.queue_entry = None
1783 if task.queue_entry:
1784 self.queue_entry = HostQueueEntry(id=task.queue_entry.id)
1785
showarded2afea2009-07-07 20:54:07 +00001786 self.task = task
1787 self._extra_command_args = extra_command_args
showarded2afea2009-07-07 20:54:07 +00001788
1789
showard8cc058f2009-09-08 16:26:33 +00001790 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00001791 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
1792
1793
1794 def _command_line(self):
1795 return _autoserv_command_line(self.host.hostname,
1796 self._extra_command_args,
1797 queue_entry=self.queue_entry)
1798
1799
1800 def _working_directory(self):
1801 return self.task.execution_path()
1802
1803
1804 @property
1805 def owner_username(self):
1806 if self.task.requested_by:
1807 return self.task.requested_by.login
1808 return None
showard8cc058f2009-09-08 16:26:33 +00001809
1810
showarded2afea2009-07-07 20:54:07 +00001811 def prolog(self):
1812 super(SpecialAgentTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001813 self.task.activate()
showarddb502762009-09-09 15:31:20 +00001814 self._write_host_keyvals(self.host)
showarded2afea2009-07-07 20:54:07 +00001815
1816
showardde634ee2009-01-30 01:44:24 +00001817 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001818 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001819
showard2fe3f1d2009-07-06 20:19:11 +00001820 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001821 return # don't fail metahost entries, they'll be reassigned
1822
showard2fe3f1d2009-07-06 20:19:11 +00001823 self.queue_entry.update_from_database()
showard8cc058f2009-09-08 16:26:33 +00001824 if self.queue_entry.status != models.HostQueueEntry.Status.QUEUED:
showardccbd6c52009-03-21 00:10:21 +00001825 return # entry has been aborted
1826
showard2fe3f1d2009-07-06 20:19:11 +00001827 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001828 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001829 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001830 self._write_keyval_after_job(queued_key, queued_time)
1831 self._write_job_finished()
showardcdaeae82009-08-31 18:32:48 +00001832
showard8cc058f2009-09-08 16:26:33 +00001833 # copy results logs into the normal place for job results
showardcdaeae82009-08-31 18:32:48 +00001834 self.monitor.try_copy_results_on_drone(
showardd1195652009-12-08 22:21:02 +00001835 source_path=self._working_directory() + '/',
showardcdaeae82009-08-31 18:32:48 +00001836 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001837
showard2fe3f1d2009-07-06 20:19:11 +00001838 self._copy_results([self.queue_entry])
showardd1195652009-12-08 22:21:02 +00001839
1840 if not self.queue_entry.job.parse_failed_repair:
1841 self.queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
1842 return
showard8cc058f2009-09-08 16:26:33 +00001843
1844 pidfile_id = _drone_manager.get_pidfile_id_from(
1845 self.queue_entry.execution_path(),
1846 pidfile_name=_AUTOSERV_PID_FILE)
1847 _drone_manager.register_pidfile(pidfile_id)
showardd1195652009-12-08 22:21:02 +00001848 self._parse_results([self.queue_entry])
showard8cc058f2009-09-08 16:26:33 +00001849
1850
1851 def cleanup(self):
1852 super(SpecialAgentTask, self).cleanup()
showarde60e44e2009-11-13 20:45:38 +00001853
1854 # We will consider an aborted task to be "Failed"
1855 self.task.finish(bool(self.success))
1856
showardf85a0b72009-10-07 20:48:45 +00001857 if self.monitor:
1858 if self.monitor.has_process():
1859 self._copy_results([self.task])
1860 if self.monitor.pidfile_id is not None:
1861 _drone_manager.unregister_pidfile(self.monitor.pidfile_id)
showard8cc058f2009-09-08 16:26:33 +00001862
1863
1864class RepairTask(SpecialAgentTask):
1865 TASK_TYPE = models.SpecialTask.Task.REPAIR
1866
1867
showardd1195652009-12-08 22:21:02 +00001868 def __init__(self, task):
showard8cc058f2009-09-08 16:26:33 +00001869 """\
1870 queue_entry: queue entry to mark failed if this repair fails.
1871 """
1872 protection = host_protections.Protection.get_string(
1873 task.host.protection)
1874 # normalize the protection name
1875 protection = host_protections.Protection.get_attr_name(protection)
1876
1877 super(RepairTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00001878 task, ['-R', '--host-protection', protection])
showard8cc058f2009-09-08 16:26:33 +00001879
1880 # *don't* include the queue entry in IDs -- if the queue entry is
1881 # aborted, we want to leave the repair task running
1882 self._set_ids(host=self.host)
1883
1884
1885 def prolog(self):
1886 super(RepairTask, self).prolog()
1887 logging.info("repair_task starting")
1888 self.host.set_status(models.Host.Status.REPAIRING)
showardde634ee2009-01-30 01:44:24 +00001889
1890
jadmanski0afbb632008-06-06 21:10:57 +00001891 def epilog(self):
1892 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001893
jadmanski0afbb632008-06-06 21:10:57 +00001894 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001895 self.host.set_status(models.Host.Status.READY)
jadmanski0afbb632008-06-06 21:10:57 +00001896 else:
showard8cc058f2009-09-08 16:26:33 +00001897 self.host.set_status(models.Host.Status.REPAIR_FAILED)
showard2fe3f1d2009-07-06 20:19:11 +00001898 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001899 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001900
1901
showarded2afea2009-07-07 20:54:07 +00001902class PreJobTask(SpecialAgentTask):
showard775300b2009-09-09 15:30:50 +00001903 def _copy_to_results_repository(self):
1904 if not self.queue_entry or self.queue_entry.meta_host:
1905 return
1906
1907 self.queue_entry.set_execution_subdir()
1908 log_name = os.path.basename(self.task.execution_path())
1909 source = os.path.join(self.task.execution_path(), 'debug',
1910 'autoserv.DEBUG')
1911 destination = os.path.join(
1912 self.queue_entry.execution_path(), log_name)
1913
1914 self.monitor.try_copy_to_results_repository(
1915 source, destination_path=destination)
1916
1917
showard170873e2009-01-07 00:22:26 +00001918 def epilog(self):
1919 super(PreJobTask, self).epilog()
showardcdaeae82009-08-31 18:32:48 +00001920
showard775300b2009-09-09 15:30:50 +00001921 if self.success:
1922 return
showard8fe93b52008-11-18 17:53:22 +00001923
showard775300b2009-09-09 15:30:50 +00001924 self._copy_to_results_repository()
showard8cc058f2009-09-08 16:26:33 +00001925
showard775300b2009-09-09 15:30:50 +00001926 if self.host.protection == host_protections.Protection.DO_NOT_VERIFY:
showard7b2d7cb2009-10-28 19:53:03 +00001927 # effectively ignore failure for these hosts
1928 self.success = True
showard775300b2009-09-09 15:30:50 +00001929 return
1930
1931 if self.queue_entry:
1932 self.queue_entry.requeue()
1933
1934 if models.SpecialTask.objects.filter(
showard8cc058f2009-09-08 16:26:33 +00001935 task=models.SpecialTask.Task.REPAIR,
showard775300b2009-09-09 15:30:50 +00001936 queue_entry__id=self.queue_entry.id):
1937 self.host.set_status(models.Host.Status.REPAIR_FAILED)
1938 self._fail_queue_entry()
1939 return
1940
showard9bb960b2009-11-19 01:02:11 +00001941 queue_entry = models.HostQueueEntry.objects.get(
1942 id=self.queue_entry.id)
showard775300b2009-09-09 15:30:50 +00001943 else:
1944 queue_entry = None
1945
1946 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00001947 host=models.Host.objects.get(id=self.host.id),
showard775300b2009-09-09 15:30:50 +00001948 task=models.SpecialTask.Task.REPAIR,
showard9bb960b2009-11-19 01:02:11 +00001949 queue_entry=queue_entry,
1950 requested_by=self.task.requested_by)
showard58721a82009-08-20 23:32:40 +00001951
showard8fe93b52008-11-18 17:53:22 +00001952
1953class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001954 TASK_TYPE = models.SpecialTask.Task.VERIFY
1955
1956
showardd1195652009-12-08 22:21:02 +00001957 def __init__(self, task):
1958 super(VerifyTask, self).__init__(task, ['-v'])
showard8cc058f2009-09-08 16:26:33 +00001959 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mblighe2586682008-02-29 22:45:46 +00001960
1961
jadmanski0afbb632008-06-06 21:10:57 +00001962 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001963 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001964
showardb18134f2009-03-20 20:52:18 +00001965 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001966 if self.queue_entry:
showard8cc058f2009-09-08 16:26:33 +00001967 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
1968 self.host.set_status(models.Host.Status.VERIFYING)
mbligh36768f02008-02-22 18:28:33 +00001969
showarded2afea2009-07-07 20:54:07 +00001970 # Delete any other queued verifies for this host. One verify will do
1971 # and there's no need to keep records of other requests.
1972 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001973 host__id=self.host.id,
1974 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001975 is_active=False, is_complete=False)
1976 queued_verifies = queued_verifies.exclude(id=self.task.id)
1977 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001978
mbligh36768f02008-02-22 18:28:33 +00001979
jadmanski0afbb632008-06-06 21:10:57 +00001980 def epilog(self):
1981 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001982 if self.success:
showard8cc058f2009-09-08 16:26:33 +00001983 if self.queue_entry:
1984 self.queue_entry.on_pending()
1985 else:
1986 self.host.set_status(models.Host.Status.READY)
mbligh36768f02008-02-22 18:28:33 +00001987
1988
showarda9545c02009-12-18 22:44:26 +00001989class AbstractQueueTask(AgentTask, TaskWithJobKeyvals):
1990 """
1991 Common functionality for QueueTask and HostlessQueueTask
1992 """
1993 def __init__(self, queue_entries):
1994 super(AbstractQueueTask, self).__init__()
showardd1195652009-12-08 22:21:02 +00001995 self.job = queue_entries[0].job
jadmanski0afbb632008-06-06 21:10:57 +00001996 self.queue_entries = queue_entries
mbligh36768f02008-02-22 18:28:33 +00001997
1998
showard73ec0442009-02-07 02:05:20 +00001999 def _keyval_path(self):
showardd1195652009-12-08 22:21:02 +00002000 return os.path.join(self._working_directory(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00002001
2002
showardd1195652009-12-08 22:21:02 +00002003 def _command_line(self):
2004 return self.job.get_autoserv_params(self.queue_entries)
2005
2006
2007 @property
2008 def num_processes(self):
2009 return len(self.queue_entries)
2010
2011
2012 @property
2013 def owner_username(self):
2014 return self.job.owner
2015
2016
2017 def _working_directory(self):
2018 return self._get_consistent_execution_path(self.queue_entries)
mblighbb421852008-03-11 22:36:16 +00002019
2020
jadmanski0afbb632008-06-06 21:10:57 +00002021 def prolog(self):
showardd9205182009-04-27 20:09:55 +00002022 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00002023 keyval_dict = {queued_key: queued_time}
showardd1195652009-12-08 22:21:02 +00002024 group_name = self.queue_entries[0].get_group_name()
2025 if group_name:
2026 keyval_dict['host_group_name'] = group_name
showardf1ae3542009-05-11 19:26:02 +00002027 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00002028 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002029 queue_entry.set_status(models.HostQueueEntry.Status.RUNNING)
showarda9545c02009-12-18 22:44:26 +00002030 queue_entry.set_started_on_now()
mbligh36768f02008-02-22 18:28:33 +00002031
2032
showard35162b02009-03-03 02:17:30 +00002033 def _write_lost_process_error_file(self):
showardd1195652009-12-08 22:21:02 +00002034 error_file_path = os.path.join(self._working_directory(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00002035 _drone_manager.write_lines_to_file(error_file_path,
2036 [_LOST_PROCESS_ERROR])
2037
2038
showardd3dc1992009-04-22 21:01:40 +00002039 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00002040 if not self.monitor:
2041 return
2042
showardd9205182009-04-27 20:09:55 +00002043 self._write_job_finished()
2044
showard35162b02009-03-03 02:17:30 +00002045 if self.monitor.lost_process:
2046 self._write_lost_process_error_file()
showard4ac47542009-08-31 18:32:19 +00002047
jadmanskif7fa2cc2008-10-01 14:13:23 +00002048
showardcbd74612008-11-19 21:42:02 +00002049 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00002050 _drone_manager.write_lines_to_file(
showardd1195652009-12-08 22:21:02 +00002051 os.path.join(self._working_directory(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00002052 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00002053 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00002054
2055
jadmanskif7fa2cc2008-10-01 14:13:23 +00002056 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00002057 if not self.monitor or not self.monitor.has_process():
2058 return
2059
jadmanskif7fa2cc2008-10-01 14:13:23 +00002060 # build up sets of all the aborted_by and aborted_on values
2061 aborted_by, aborted_on = set(), set()
2062 for queue_entry in self.queue_entries:
2063 if queue_entry.aborted_by:
2064 aborted_by.add(queue_entry.aborted_by)
2065 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
2066 aborted_on.add(t)
2067
2068 # extract some actual, unique aborted by value and write it out
2069 assert len(aborted_by) <= 1
2070 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00002071 aborted_by_value = aborted_by.pop()
2072 aborted_on_value = max(aborted_on)
2073 else:
2074 aborted_by_value = 'autotest_system'
2075 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00002076
showarda0382352009-02-11 23:36:43 +00002077 self._write_keyval_after_job("aborted_by", aborted_by_value)
2078 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00002079
showardcbd74612008-11-19 21:42:02 +00002080 aborted_on_string = str(datetime.datetime.fromtimestamp(
2081 aborted_on_value))
2082 self._write_status_comment('Job aborted by %s on %s' %
2083 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00002084
2085
jadmanski0afbb632008-06-06 21:10:57 +00002086 def abort(self):
showarda9545c02009-12-18 22:44:26 +00002087 super(AbstractQueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00002088 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00002089 self._finish_task()
showard21baa452008-10-21 00:08:39 +00002090
2091
jadmanski0afbb632008-06-06 21:10:57 +00002092 def epilog(self):
showarda9545c02009-12-18 22:44:26 +00002093 super(AbstractQueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002094 self._finish_task()
showarda9545c02009-12-18 22:44:26 +00002095
2096
2097class QueueTask(AbstractQueueTask):
2098 def __init__(self, queue_entries):
2099 super(QueueTask, self).__init__(queue_entries)
2100 self._set_ids(queue_entries=queue_entries)
2101
2102
2103 def prolog(self):
2104 for entry in self.queue_entries:
2105 if entry.status not in (models.HostQueueEntry.Status.STARTING,
2106 models.HostQueueEntry.Status.RUNNING):
2107 raise SchedulerError('Queue task attempting to start '
2108 'entry with invalid status %s: %s'
2109 % (entry.status, entry))
2110 if entry.host.status not in (models.Host.Status.PENDING,
2111 models.Host.Status.RUNNING):
2112 raise SchedulerError('Queue task attempting to start on queue '
2113 'entry with invalid host status %s: %s'
2114 % (entry.host.status, entry))
2115
2116 super(QueueTask, self).prolog()
2117
2118 for queue_entry in self.queue_entries:
2119 self._write_host_keyvals(queue_entry.host)
2120 queue_entry.host.set_status(models.Host.Status.RUNNING)
2121 queue_entry.host.update_field('dirty', 1)
2122 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
2123 # TODO(gps): Remove this if nothing needs it anymore.
2124 # A potential user is: tko/parser
2125 self.job.write_to_machines_file(self.queue_entries[0])
2126
2127
2128 def _finish_task(self):
2129 super(QueueTask, self)._finish_task()
2130
2131 for queue_entry in self.queue_entries:
2132 queue_entry.set_status(models.HostQueueEntry.Status.GATHERING)
mbligh36768f02008-02-22 18:28:33 +00002133
2134
showardd3dc1992009-04-22 21:01:40 +00002135class PostJobTask(AgentTask):
showardd1195652009-12-08 22:21:02 +00002136 def __init__(self, queue_entries, log_file_name):
2137 super(PostJobTask, self).__init__(log_file_name=log_file_name)
showardd3dc1992009-04-22 21:01:40 +00002138
showardd1195652009-12-08 22:21:02 +00002139 self.queue_entries = queue_entries
2140
showardd3dc1992009-04-22 21:01:40 +00002141 self._autoserv_monitor = PidfileRunMonitor()
showardd1195652009-12-08 22:21:02 +00002142 self._autoserv_monitor.attach_to_existing_process(
2143 self._working_directory())
showardd3dc1992009-04-22 21:01:40 +00002144
showardd1195652009-12-08 22:21:02 +00002145
2146 def _command_line(self):
showardd3dc1992009-04-22 21:01:40 +00002147 if _testing_mode:
showardd1195652009-12-08 22:21:02 +00002148 return 'true'
2149 return self._generate_command(
2150 _drone_manager.absolute_path(self._working_directory()))
showardd3dc1992009-04-22 21:01:40 +00002151
2152
2153 def _generate_command(self, results_dir):
2154 raise NotImplementedError('Subclasses must override this')
2155
2156
showardd1195652009-12-08 22:21:02 +00002157 @property
2158 def owner_username(self):
2159 return self.queue_entries[0].job.owner
2160
2161
2162 def _working_directory(self):
2163 return self._get_consistent_execution_path(self.queue_entries)
2164
2165
2166 def _paired_with_monitor(self):
2167 return self._autoserv_monitor
2168
2169
showardd3dc1992009-04-22 21:01:40 +00002170 def _job_was_aborted(self):
2171 was_aborted = None
showardd1195652009-12-08 22:21:02 +00002172 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002173 queue_entry.update_from_database()
2174 if was_aborted is None: # first queue entry
2175 was_aborted = bool(queue_entry.aborted)
2176 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
2177 email_manager.manager.enqueue_notify_email(
2178 'Inconsistent abort state',
2179 'Queue entries have inconsistent abort state: ' +
2180 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2181 # don't crash here, just assume true
2182 return True
2183 return was_aborted
2184
2185
showardd1195652009-12-08 22:21:02 +00002186 def _final_status(self):
showardd3dc1992009-04-22 21:01:40 +00002187 if self._job_was_aborted():
2188 return models.HostQueueEntry.Status.ABORTED
2189
2190 # we'll use a PidfileRunMonitor to read the autoserv exit status
2191 if self._autoserv_monitor.exit_code() == 0:
2192 return models.HostQueueEntry.Status.COMPLETED
2193 return models.HostQueueEntry.Status.FAILED
2194
2195
showardd3dc1992009-04-22 21:01:40 +00002196 def _set_all_statuses(self, status):
showardd1195652009-12-08 22:21:02 +00002197 for queue_entry in self.queue_entries:
showardd3dc1992009-04-22 21:01:40 +00002198 queue_entry.set_status(status)
2199
2200
2201 def abort(self):
2202 # override AgentTask.abort() to avoid killing the process and ending
2203 # the task. post-job tasks continue when the job is aborted.
2204 pass
2205
2206
showard9bb960b2009-11-19 01:02:11 +00002207class GatherLogsTask(PostJobTask):
showardd3dc1992009-04-22 21:01:40 +00002208 """
2209 Task responsible for
2210 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2211 * copying logs to the results repository
2212 * spawning CleanupTasks for hosts, if necessary
2213 * spawning a FinalReparseTask for the job
2214 """
showardd1195652009-12-08 22:21:02 +00002215 def __init__(self, queue_entries, recover_run_monitor=None):
2216 self._job = queue_entries[0].job
showardd3dc1992009-04-22 21:01:40 +00002217 super(GatherLogsTask, self).__init__(
showardd1195652009-12-08 22:21:02 +00002218 queue_entries, log_file_name='.collect_crashinfo.log')
showardd3dc1992009-04-22 21:01:40 +00002219 self._set_ids(queue_entries=queue_entries)
2220
2221
2222 def _generate_command(self, results_dir):
2223 host_list = ','.join(queue_entry.host.hostname
showardd1195652009-12-08 22:21:02 +00002224 for queue_entry in self.queue_entries)
showardd3dc1992009-04-22 21:01:40 +00002225 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2226 '-r', results_dir]
2227
2228
showardd1195652009-12-08 22:21:02 +00002229 @property
2230 def num_processes(self):
2231 return len(self.queue_entries)
2232
2233
2234 def _pidfile_name(self):
2235 return _CRASHINFO_PID_FILE
2236
2237
showardd3dc1992009-04-22 21:01:40 +00002238 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002239 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002240 if queue_entry.status != models.HostQueueEntry.Status.GATHERING:
2241 raise SchedulerError('Gather task attempting to start on '
2242 'non-gathering entry: %s' % queue_entry)
2243 if queue_entry.host.status != models.Host.Status.RUNNING:
2244 raise SchedulerError('Gather task attempting to start on queue '
showard0c5c18d2009-09-24 22:20:45 +00002245 'entry with non-running host status %s: %s'
2246 % (queue_entry.host.status, queue_entry))
showard8cc058f2009-09-08 16:26:33 +00002247
showardd3dc1992009-04-22 21:01:40 +00002248 super(GatherLogsTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002249
2250
showardd3dc1992009-04-22 21:01:40 +00002251 def epilog(self):
2252 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002253
showardd1195652009-12-08 22:21:02 +00002254 self._copy_and_parse_results(self.queue_entries,
showard6d1c1432009-08-20 23:30:39 +00002255 use_monitor=self._autoserv_monitor)
showard9bb960b2009-11-19 01:02:11 +00002256 self._reboot_hosts()
showard6d1c1432009-08-20 23:30:39 +00002257
showard9bb960b2009-11-19 01:02:11 +00002258
2259 def _reboot_hosts(self):
showard6d1c1432009-08-20 23:30:39 +00002260 if self._autoserv_monitor.has_process():
showardd1195652009-12-08 22:21:02 +00002261 final_success = (self._final_status() ==
showard6d1c1432009-08-20 23:30:39 +00002262 models.HostQueueEntry.Status.COMPLETED)
2263 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2264 else:
2265 final_success = False
2266 num_tests_failed = 0
2267
showard9bb960b2009-11-19 01:02:11 +00002268 reboot_after = self._job.reboot_after
2269 do_reboot = (
2270 # always reboot after aborted jobs
showardd1195652009-12-08 22:21:02 +00002271 self._final_status() == models.HostQueueEntry.Status.ABORTED
showard9bb960b2009-11-19 01:02:11 +00002272 or reboot_after == models.RebootAfter.ALWAYS
2273 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
2274 and final_success and num_tests_failed == 0))
2275
showardd1195652009-12-08 22:21:02 +00002276 for queue_entry in self.queue_entries:
showard9bb960b2009-11-19 01:02:11 +00002277 if do_reboot:
2278 # don't pass the queue entry to the CleanupTask. if the cleanup
2279 # fails, the job doesn't care -- it's over.
2280 models.SpecialTask.objects.create(
2281 host=models.Host.objects.get(id=queue_entry.host.id),
2282 task=models.SpecialTask.Task.CLEANUP,
2283 requested_by=self._job.owner_model())
2284 else:
2285 queue_entry.host.set_status(models.Host.Status.READY)
showardd3dc1992009-04-22 21:01:40 +00002286
2287
showard0bbfc212009-04-29 21:06:13 +00002288 def run(self):
showard597bfd32009-05-08 18:22:50 +00002289 autoserv_exit_code = self._autoserv_monitor.exit_code()
2290 # only run if Autoserv exited due to some signal. if we have no exit
2291 # code, assume something bad (and signal-like) happened.
2292 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002293 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002294 else:
2295 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002296
2297
showard8fe93b52008-11-18 17:53:22 +00002298class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002299 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2300
2301
showard8cc058f2009-09-08 16:26:33 +00002302 def __init__(self, task, recover_run_monitor=None):
showardd1195652009-12-08 22:21:02 +00002303 super(CleanupTask, self).__init__(task, ['--cleanup'])
showard8cc058f2009-09-08 16:26:33 +00002304 self._set_ids(host=self.host, queue_entries=[self.queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002305
mblighd5c95802008-03-05 00:33:46 +00002306
jadmanski0afbb632008-06-06 21:10:57 +00002307 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002308 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002309 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard8cc058f2009-09-08 16:26:33 +00002310 self.host.set_status(models.Host.Status.CLEANING)
2311 if self.queue_entry:
2312 self.queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2313
2314
showard775300b2009-09-09 15:30:50 +00002315 def _finish_epilog(self):
showard7b2d7cb2009-10-28 19:53:03 +00002316 if not self.queue_entry or not self.success:
showard775300b2009-09-09 15:30:50 +00002317 return
2318
showard7b2d7cb2009-10-28 19:53:03 +00002319 do_not_verify_protection = host_protections.Protection.DO_NOT_VERIFY
2320 should_run_verify = (
2321 self.queue_entry.job.run_verify
2322 and self.host.protection != do_not_verify_protection)
2323 if should_run_verify:
showard9bb960b2009-11-19 01:02:11 +00002324 entry = models.HostQueueEntry.objects.get(id=self.queue_entry.id)
showard7b2d7cb2009-10-28 19:53:03 +00002325 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00002326 host=models.Host.objects.get(id=self.host.id),
showard7b2d7cb2009-10-28 19:53:03 +00002327 queue_entry=entry,
2328 task=models.SpecialTask.Task.VERIFY)
2329 else:
showard775300b2009-09-09 15:30:50 +00002330 self.queue_entry.on_pending()
mblighd5c95802008-03-05 00:33:46 +00002331
mblighd5c95802008-03-05 00:33:46 +00002332
showard21baa452008-10-21 00:08:39 +00002333 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002334 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002335
showard21baa452008-10-21 00:08:39 +00002336 if self.success:
2337 self.host.update_field('dirty', 0)
showard775300b2009-09-09 15:30:50 +00002338 self.host.set_status(models.Host.Status.READY)
showard21baa452008-10-21 00:08:39 +00002339
showard775300b2009-09-09 15:30:50 +00002340 self._finish_epilog()
showard8cc058f2009-09-08 16:26:33 +00002341
showard21baa452008-10-21 00:08:39 +00002342
showardd3dc1992009-04-22 21:01:40 +00002343class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002344 _num_running_parses = 0
2345
showardd1195652009-12-08 22:21:02 +00002346 def __init__(self, queue_entries):
2347 super(FinalReparseTask, self).__init__(queue_entries,
2348 log_file_name='.parse.log')
showard170873e2009-01-07 00:22:26 +00002349 # don't use _set_ids, since we don't want to set the host_ids
2350 self.queue_entry_ids = [entry.id for entry in queue_entries]
showardd1195652009-12-08 22:21:02 +00002351
2352
2353 def _generate_command(self, results_dir):
2354 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
2355 results_dir]
2356
2357
2358 @property
2359 def num_processes(self):
2360 return 0 # don't include parser processes in accounting
2361
2362
2363 def _pidfile_name(self):
2364 return _PARSER_PID_FILE
2365
2366
2367 def _parse_started(self):
2368 return bool(self.monitor)
showard97aed502008-11-04 02:01:24 +00002369
showard97aed502008-11-04 02:01:24 +00002370
2371 @classmethod
2372 def _increment_running_parses(cls):
2373 cls._num_running_parses += 1
2374
2375
2376 @classmethod
2377 def _decrement_running_parses(cls):
2378 cls._num_running_parses -= 1
2379
2380
2381 @classmethod
2382 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002383 return (cls._num_running_parses <
2384 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002385
2386
2387 def prolog(self):
showardd1195652009-12-08 22:21:02 +00002388 for queue_entry in self.queue_entries:
showard8cc058f2009-09-08 16:26:33 +00002389 if queue_entry.status != models.HostQueueEntry.Status.PARSING:
2390 raise SchedulerError('Parse task attempting to start on '
2391 'non-parsing entry: %s' % queue_entry)
2392
showard97aed502008-11-04 02:01:24 +00002393 super(FinalReparseTask, self).prolog()
showard97aed502008-11-04 02:01:24 +00002394
2395
2396 def epilog(self):
2397 super(FinalReparseTask, self).epilog()
showardd1195652009-12-08 22:21:02 +00002398 self._set_all_statuses(self._final_status())
showard97aed502008-11-04 02:01:24 +00002399
2400
showard08a36412009-05-05 01:01:13 +00002401 def tick(self):
2402 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002403 # and we can, at which point we revert to default behavior
showardd1195652009-12-08 22:21:02 +00002404 if self._parse_started():
showard08a36412009-05-05 01:01:13 +00002405 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002406 else:
2407 self._try_starting_parse()
2408
2409
2410 def run(self):
2411 # override run() to not actually run unless we can
2412 self._try_starting_parse()
2413
2414
2415 def _try_starting_parse(self):
2416 if not self._can_run_new_parse():
2417 return
showard170873e2009-01-07 00:22:26 +00002418
showard97aed502008-11-04 02:01:24 +00002419 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002420 super(FinalReparseTask, self).run()
showard97aed502008-11-04 02:01:24 +00002421 self._increment_running_parses()
showard97aed502008-11-04 02:01:24 +00002422
2423
2424 def finished(self, success):
2425 super(FinalReparseTask, self).finished(success)
showardd1195652009-12-08 22:21:02 +00002426 if self._parse_started():
showard678df4f2009-02-04 21:36:39 +00002427 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002428
2429
showarda9545c02009-12-18 22:44:26 +00002430class HostlessQueueTask(AbstractQueueTask):
2431 def __init__(self, queue_entry):
2432 super(HostlessQueueTask, self).__init__([queue_entry])
2433 self.queue_entry_ids = [queue_entry.id]
2434
2435
2436 def prolog(self):
2437 self.queue_entries[0].update_field('execution_subdir', 'hostless')
2438 super(HostlessQueueTask, self).prolog()
2439
2440
2441 def _final_status(self):
2442 if self.queue_entries[0].aborted:
2443 return models.HostQueueEntry.Status.ABORTED
2444 if self.monitor.exit_code() == 0:
2445 return models.HostQueueEntry.Status.COMPLETED
2446 return models.HostQueueEntry.Status.FAILED
2447
2448
2449 def _finish_task(self):
2450 super(HostlessQueueTask, self)._finish_task()
2451 self.queue_entries[0].set_status(self._final_status())
2452
2453
showarda3c58572009-03-12 20:36:59 +00002454class DBError(Exception):
2455 """Raised by the DBObject constructor when its select fails."""
2456
2457
mbligh36768f02008-02-22 18:28:33 +00002458class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002459 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002460
2461 # Subclasses MUST override these:
2462 _table_name = ''
2463 _fields = ()
2464
showarda3c58572009-03-12 20:36:59 +00002465 # A mapping from (type, id) to the instance of the object for that
2466 # particular id. This prevents us from creating new Job() and Host()
2467 # instances for every HostQueueEntry object that we instantiate as
2468 # multiple HQEs often share the same Job.
2469 _instances_by_type_and_id = weakref.WeakValueDictionary()
2470 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002471
showarda3c58572009-03-12 20:36:59 +00002472
2473 def __new__(cls, id=None, **kwargs):
2474 """
2475 Look to see if we already have an instance for this particular type
2476 and id. If so, use it instead of creating a duplicate instance.
2477 """
2478 if id is not None:
2479 instance = cls._instances_by_type_and_id.get((cls, id))
2480 if instance:
2481 return instance
2482 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2483
2484
2485 def __init__(self, id=None, row=None, new_record=False, always_query=True):
showard8cc058f2009-09-08 16:26:33 +00002486 assert bool(id) or bool(row)
2487 if id is not None and row is not None:
2488 assert id == row[0]
showard6ae5ea92009-02-25 00:11:51 +00002489 assert self._table_name, '_table_name must be defined in your class'
2490 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002491 if not new_record:
2492 if self._initialized and not always_query:
2493 return # We've already been initialized.
2494 if id is None:
2495 id = row[0]
2496 # Tell future constructors to use us instead of re-querying while
2497 # this instance is still around.
2498 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002499
showard6ae5ea92009-02-25 00:11:51 +00002500 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002501
jadmanski0afbb632008-06-06 21:10:57 +00002502 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002503
jadmanski0afbb632008-06-06 21:10:57 +00002504 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002505 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002506
showarda3c58572009-03-12 20:36:59 +00002507 if self._initialized:
2508 differences = self._compare_fields_in_row(row)
2509 if differences:
showard7629f142009-03-27 21:02:02 +00002510 logging.warn(
2511 'initialized %s %s instance requery is updating: %s',
2512 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002513 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002514 self._initialized = True
2515
2516
2517 @classmethod
2518 def _clear_instance_cache(cls):
2519 """Used for testing, clear the internal instance cache."""
2520 cls._instances_by_type_and_id.clear()
2521
2522
showardccbd6c52009-03-21 00:10:21 +00002523 def _fetch_row_from_db(self, row_id):
2524 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2525 rows = _db.execute(sql, (row_id,))
2526 if not rows:
showard76e29d12009-04-15 21:53:10 +00002527 raise DBError("row not found (table=%s, row id=%s)"
2528 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002529 return rows[0]
2530
2531
showarda3c58572009-03-12 20:36:59 +00002532 def _assert_row_length(self, row):
2533 assert len(row) == len(self._fields), (
2534 "table = %s, row = %s/%d, fields = %s/%d" % (
2535 self.__table, row, len(row), self._fields, len(self._fields)))
2536
2537
2538 def _compare_fields_in_row(self, row):
2539 """
showarddae680a2009-10-12 20:26:43 +00002540 Given a row as returned by a SELECT query, compare it to our existing in
2541 memory fields. Fractional seconds are stripped from datetime values
2542 before comparison.
showarda3c58572009-03-12 20:36:59 +00002543
2544 @param row - A sequence of values corresponding to fields named in
2545 The class attribute _fields.
2546
2547 @returns A dictionary listing the differences keyed by field name
2548 containing tuples of (current_value, row_value).
2549 """
2550 self._assert_row_length(row)
2551 differences = {}
showarddae680a2009-10-12 20:26:43 +00002552 datetime_cmp_fmt = '%Y-%m-%d %H:%M:%S' # Leave off the microseconds.
showarda3c58572009-03-12 20:36:59 +00002553 for field, row_value in itertools.izip(self._fields, row):
2554 current_value = getattr(self, field)
showarddae680a2009-10-12 20:26:43 +00002555 if (isinstance(current_value, datetime.datetime)
2556 and isinstance(row_value, datetime.datetime)):
2557 current_value = current_value.strftime(datetime_cmp_fmt)
2558 row_value = row_value.strftime(datetime_cmp_fmt)
showarda3c58572009-03-12 20:36:59 +00002559 if current_value != row_value:
2560 differences[field] = (current_value, row_value)
2561 return differences
showard2bab8f42008-11-12 18:15:22 +00002562
2563
2564 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002565 """
2566 Update our field attributes using a single row returned by SELECT.
2567
2568 @param row - A sequence of values corresponding to fields named in
2569 the class fields list.
2570 """
2571 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002572
showard2bab8f42008-11-12 18:15:22 +00002573 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002574 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002575 setattr(self, field, value)
2576 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002577
showard2bab8f42008-11-12 18:15:22 +00002578 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002579
mblighe2586682008-02-29 22:45:46 +00002580
showardccbd6c52009-03-21 00:10:21 +00002581 def update_from_database(self):
2582 assert self.id is not None
2583 row = self._fetch_row_from_db(self.id)
2584 self._update_fields_from_row(row)
2585
2586
jadmanski0afbb632008-06-06 21:10:57 +00002587 def count(self, where, table = None):
2588 if not table:
2589 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002590
jadmanski0afbb632008-06-06 21:10:57 +00002591 rows = _db.execute("""
2592 SELECT count(*) FROM %s
2593 WHERE %s
2594 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002595
jadmanski0afbb632008-06-06 21:10:57 +00002596 assert len(rows) == 1
2597
2598 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002599
2600
showardd3dc1992009-04-22 21:01:40 +00002601 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002602 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002603
showard2bab8f42008-11-12 18:15:22 +00002604 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002605 return
mbligh36768f02008-02-22 18:28:33 +00002606
mblighf8c624d2008-07-03 16:58:45 +00002607 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002608 _db.execute(query, (value, self.id))
2609
showard2bab8f42008-11-12 18:15:22 +00002610 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002611
2612
jadmanski0afbb632008-06-06 21:10:57 +00002613 def save(self):
2614 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002615 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002616 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002617 values = []
2618 for key in keys:
2619 value = getattr(self, key)
2620 if value is None:
2621 values.append('NULL')
2622 else:
2623 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002624 values_str = ','.join(values)
2625 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2626 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002627 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002628 # Update our id to the one the database just assigned to us.
2629 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002630
2631
jadmanski0afbb632008-06-06 21:10:57 +00002632 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002633 self._instances_by_type_and_id.pop((type(self), id), None)
2634 self._initialized = False
2635 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002636 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2637 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002638
2639
showard63a34772008-08-18 19:32:50 +00002640 @staticmethod
2641 def _prefix_with(string, prefix):
2642 if string:
2643 string = prefix + string
2644 return string
2645
2646
jadmanski0afbb632008-06-06 21:10:57 +00002647 @classmethod
showard989f25d2008-10-01 11:38:11 +00002648 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002649 """
2650 Construct instances of our class based on the given database query.
2651
2652 @yields One class instance for each row fetched.
2653 """
showard63a34772008-08-18 19:32:50 +00002654 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2655 where = cls._prefix_with(where, 'WHERE ')
2656 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002657 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002658 'joins' : joins,
2659 'where' : where,
2660 'order_by' : order_by})
2661 rows = _db.execute(query, params)
showard8cc058f2009-09-08 16:26:33 +00002662 return [cls(id=row[0], row=row) for row in rows]
mblighe2586682008-02-29 22:45:46 +00002663
mbligh36768f02008-02-22 18:28:33 +00002664
2665class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002666 _table_name = 'ineligible_host_queues'
2667 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002668
2669
showard89f84db2009-03-12 20:39:13 +00002670class AtomicGroup(DBObject):
2671 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002672 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2673 'invalid')
showard89f84db2009-03-12 20:39:13 +00002674
2675
showard989f25d2008-10-01 11:38:11 +00002676class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002677 _table_name = 'labels'
2678 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002679 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002680
2681
showard6157c632009-07-06 20:19:31 +00002682 def __repr__(self):
2683 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2684 self.name, self.id, self.atomic_group_id)
2685
2686
mbligh36768f02008-02-22 18:28:33 +00002687class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002688 _table_name = 'hosts'
2689 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2690 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2691
2692
jadmanski0afbb632008-06-06 21:10:57 +00002693 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002694 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002695 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002696
2697
showard170873e2009-01-07 00:22:26 +00002698 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002699 """
showard170873e2009-01-07 00:22:26 +00002700 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002701 """
2702 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002703 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002704 FROM labels
2705 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002706 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002707 ORDER BY labels.name
2708 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002709 platform = None
2710 all_labels = []
2711 for label_name, is_platform in rows:
2712 if is_platform:
2713 platform = label_name
2714 all_labels.append(label_name)
2715 return platform, all_labels
2716
2717
showard54c1ea92009-05-20 00:32:58 +00002718 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2719
2720
2721 @classmethod
2722 def cmp_for_sort(cls, a, b):
2723 """
2724 A comparison function for sorting Host objects by hostname.
2725
2726 This strips any trailing numeric digits, ignores leading 0s and
2727 compares hostnames by the leading name and the trailing digits as a
2728 number. If both hostnames do not match this pattern, they are simply
2729 compared as lower case strings.
2730
2731 Example of how hostnames will be sorted:
2732
2733 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2734
2735 This hopefully satisfy most people's hostname sorting needs regardless
2736 of their exact naming schemes. Nobody sane should have both a host10
2737 and host010 (but the algorithm works regardless).
2738 """
2739 lower_a = a.hostname.lower()
2740 lower_b = b.hostname.lower()
2741 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2742 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2743 if match_a and match_b:
2744 name_a, number_a_str = match_a.groups()
2745 name_b, number_b_str = match_b.groups()
2746 number_a = int(number_a_str.lstrip('0'))
2747 number_b = int(number_b_str.lstrip('0'))
2748 result = cmp((name_a, number_a), (name_b, number_b))
2749 if result == 0 and lower_a != lower_b:
2750 # If they compared equal above but the lower case names are
2751 # indeed different, don't report equality. abc012 != abc12.
2752 return cmp(lower_a, lower_b)
2753 return result
2754 else:
2755 return cmp(lower_a, lower_b)
2756
2757
mbligh36768f02008-02-22 18:28:33 +00002758class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002759 _table_name = 'host_queue_entries'
2760 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002761 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002762 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002763
2764
showarda3c58572009-03-12 20:36:59 +00002765 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002766 assert id or row
showarda3c58572009-03-12 20:36:59 +00002767 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002768 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002769
jadmanski0afbb632008-06-06 21:10:57 +00002770 if self.host_id:
2771 self.host = Host(self.host_id)
2772 else:
2773 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002774
showard77182562009-06-10 00:16:05 +00002775 if self.atomic_group_id:
2776 self.atomic_group = AtomicGroup(self.atomic_group_id,
2777 always_query=False)
2778 else:
2779 self.atomic_group = None
2780
showard170873e2009-01-07 00:22:26 +00002781 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002782 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002783
2784
showard89f84db2009-03-12 20:39:13 +00002785 @classmethod
2786 def clone(cls, template):
2787 """
2788 Creates a new row using the values from a template instance.
2789
2790 The new instance will not exist in the database or have a valid
2791 id attribute until its save() method is called.
2792 """
2793 assert isinstance(template, cls)
2794 new_row = [getattr(template, field) for field in cls._fields]
2795 clone = cls(row=new_row, new_record=True)
2796 clone.id = None
2797 return clone
2798
2799
showardc85c21b2008-11-24 22:17:37 +00002800 def _view_job_url(self):
2801 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2802
2803
showardf1ae3542009-05-11 19:26:02 +00002804 def get_labels(self):
2805 """
2806 Get all labels associated with this host queue entry (either via the
2807 meta_host or as a job dependency label). The labels yielded are not
2808 guaranteed to be unique.
2809
2810 @yields Label instances associated with this host_queue_entry.
2811 """
2812 if self.meta_host:
2813 yield Label(id=self.meta_host, always_query=False)
2814 labels = Label.fetch(
2815 joins="JOIN jobs_dependency_labels AS deps "
2816 "ON (labels.id = deps.label_id)",
2817 where="deps.job_id = %d" % self.job.id)
2818 for label in labels:
2819 yield label
2820
2821
jadmanski0afbb632008-06-06 21:10:57 +00002822 def set_host(self, host):
2823 if host:
2824 self.queue_log_record('Assigning host ' + host.hostname)
2825 self.update_field('host_id', host.id)
2826 self.update_field('active', True)
2827 self.block_host(host.id)
2828 else:
2829 self.queue_log_record('Releasing host')
2830 self.unblock_host(self.host.id)
2831 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002832
jadmanski0afbb632008-06-06 21:10:57 +00002833 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002834
2835
jadmanski0afbb632008-06-06 21:10:57 +00002836 def queue_log_record(self, log_line):
2837 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002838 _drone_manager.write_lines_to_file(self.queue_log_path,
2839 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002840
2841
jadmanski0afbb632008-06-06 21:10:57 +00002842 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002843 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002844 row = [0, self.job.id, host_id]
2845 block = IneligibleHostQueue(row=row, new_record=True)
2846 block.save()
mblighe2586682008-02-29 22:45:46 +00002847
2848
jadmanski0afbb632008-06-06 21:10:57 +00002849 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002850 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002851 blocks = IneligibleHostQueue.fetch(
2852 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2853 for block in blocks:
2854 block.delete()
mblighe2586682008-02-29 22:45:46 +00002855
2856
showard2bab8f42008-11-12 18:15:22 +00002857 def set_execution_subdir(self, subdir=None):
2858 if subdir is None:
showarda9545c02009-12-18 22:44:26 +00002859 assert self.host
2860 subdir = self.host.hostname
showard2bab8f42008-11-12 18:15:22 +00002861 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002862
2863
showard6355f6b2008-12-05 18:52:13 +00002864 def _get_hostname(self):
2865 if self.host:
2866 return self.host.hostname
2867 return 'no host'
2868
2869
showard170873e2009-01-07 00:22:26 +00002870 def __str__(self):
showard828fc4c2009-09-14 20:31:00 +00002871 flags = []
2872 if self.active:
2873 flags.append('active')
2874 if self.complete:
2875 flags.append('complete')
2876 if self.deleted:
2877 flags.append('deleted')
2878 if self.aborted:
2879 flags.append('aborted')
2880 flags_str = ','.join(flags)
2881 if flags_str:
2882 flags_str = ' [%s]' % flags_str
2883 return "%s/%d (%d) %s%s" % (self._get_hostname(), self.job.id, self.id,
2884 self.status, flags_str)
showard170873e2009-01-07 00:22:26 +00002885
2886
jadmanski0afbb632008-06-06 21:10:57 +00002887 def set_status(self, status):
showard56824072009-10-12 20:30:21 +00002888 logging.info("%s -> %s", self, status)
mblighf8c624d2008-07-03 16:58:45 +00002889
showard56824072009-10-12 20:30:21 +00002890 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002891
showard8cc058f2009-09-08 16:26:33 +00002892 if status in (models.HostQueueEntry.Status.QUEUED,
2893 models.HostQueueEntry.Status.PARSING):
jadmanski0afbb632008-06-06 21:10:57 +00002894 self.update_field('complete', False)
2895 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002896
showard8cc058f2009-09-08 16:26:33 +00002897 if status in (models.HostQueueEntry.Status.PENDING,
2898 models.HostQueueEntry.Status.RUNNING,
2899 models.HostQueueEntry.Status.VERIFYING,
2900 models.HostQueueEntry.Status.STARTING,
2901 models.HostQueueEntry.Status.GATHERING):
jadmanski0afbb632008-06-06 21:10:57 +00002902 self.update_field('complete', False)
2903 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002904
showard8cc058f2009-09-08 16:26:33 +00002905 if status in (models.HostQueueEntry.Status.FAILED,
2906 models.HostQueueEntry.Status.COMPLETED,
2907 models.HostQueueEntry.Status.STOPPED,
2908 models.HostQueueEntry.Status.ABORTED):
jadmanski0afbb632008-06-06 21:10:57 +00002909 self.update_field('complete', True)
2910 self.update_field('active', False)
showardf85a0b72009-10-07 20:48:45 +00002911 self._on_complete()
showardc85c21b2008-11-24 22:17:37 +00002912
2913 should_email_status = (status.lower() in _notify_email_statuses or
2914 'all' in _notify_email_statuses)
2915 if should_email_status:
2916 self._email_on_status(status)
2917
2918 self._email_on_job_complete()
2919
2920
showardf85a0b72009-10-07 20:48:45 +00002921 def _on_complete(self):
showardd1195652009-12-08 22:21:02 +00002922 self.job.stop_if_necessary()
showardf85a0b72009-10-07 20:48:45 +00002923 if not self.execution_subdir:
2924 return
2925 # unregister any possible pidfiles associated with this queue entry
2926 for pidfile_name in _ALL_PIDFILE_NAMES:
2927 pidfile_id = _drone_manager.get_pidfile_id_from(
2928 self.execution_path(), pidfile_name=pidfile_name)
2929 _drone_manager.unregister_pidfile(pidfile_id)
2930
2931
showardc85c21b2008-11-24 22:17:37 +00002932 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002933 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002934
2935 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2936 self.job.id, self.job.name, hostname, status)
2937 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2938 self.job.id, self.job.name, hostname, status,
2939 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002940 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002941
2942
2943 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002944 if not self.job.is_finished():
2945 return
showard542e8402008-09-19 20:16:18 +00002946
showardc85c21b2008-11-24 22:17:37 +00002947 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002948 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002949 for queue_entry in hosts_queue:
2950 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002951 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002952 queue_entry.status))
2953
2954 summary_text = "\n".join(summary_text)
2955 status_counts = models.Job.objects.get_status_counts(
2956 [self.job.id])[self.job.id]
2957 status = ', '.join('%d %s' % (count, status) for status, count
2958 in status_counts.iteritems())
2959
2960 subject = 'Autotest: Job ID: %s "%s" %s' % (
2961 self.job.id, self.job.name, status)
2962 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2963 self.job.id, self.job.name, status, self._view_job_url(),
2964 summary_text)
showard170873e2009-01-07 00:22:26 +00002965 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002966
2967
showard8cc058f2009-09-08 16:26:33 +00002968 def schedule_pre_job_tasks(self, assigned_host=None):
showard77182562009-06-10 00:16:05 +00002969 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002970 assert assigned_host
2971 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002972 if self.host_id is None:
2973 self.set_host(assigned_host)
2974 else:
2975 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002976
showard2ca64c92009-12-10 21:41:02 +00002977 logging.info("%s/%s/%s (job %s, entry %s) scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002978 self.job.name, self.meta_host, self.atomic_group_id,
showard2ca64c92009-12-10 21:41:02 +00002979 self.job.id, self.id, self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002980
showard8cc058f2009-09-08 16:26:33 +00002981 self._do_schedule_pre_job_tasks()
showard77182562009-06-10 00:16:05 +00002982
2983
showard8cc058f2009-09-08 16:26:33 +00002984 def _do_schedule_pre_job_tasks(self):
showard77182562009-06-10 00:16:05 +00002985 # Every host goes thru the Verifying stage (which may or may not
2986 # actually do anything as determined by get_pre_job_tasks).
2987 self.set_status(models.HostQueueEntry.Status.VERIFYING)
showard8cc058f2009-09-08 16:26:33 +00002988 self.job.schedule_pre_job_tasks(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002989
showard6ae5ea92009-02-25 00:11:51 +00002990
jadmanski0afbb632008-06-06 21:10:57 +00002991 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002992 assert self.host
showard8cc058f2009-09-08 16:26:33 +00002993 self.set_status(models.HostQueueEntry.Status.QUEUED)
showard12f3e322009-05-13 21:27:42 +00002994 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002995 # verify/cleanup failure sets the execution subdir, so reset it here
2996 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002997 if self.meta_host:
2998 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002999
3000
jadmanskif7fa2cc2008-10-01 14:13:23 +00003001 @property
3002 def aborted_by(self):
3003 self._load_abort_info()
3004 return self._aborted_by
3005
3006
3007 @property
3008 def aborted_on(self):
3009 self._load_abort_info()
3010 return self._aborted_on
3011
3012
3013 def _load_abort_info(self):
3014 """ Fetch info about who aborted the job. """
3015 if hasattr(self, "_aborted_by"):
3016 return
3017 rows = _db.execute("""
3018 SELECT users.login, aborted_host_queue_entries.aborted_on
3019 FROM aborted_host_queue_entries
3020 INNER JOIN users
3021 ON users.id = aborted_host_queue_entries.aborted_by_id
3022 WHERE aborted_host_queue_entries.queue_entry_id = %s
3023 """, (self.id,))
3024 if rows:
3025 self._aborted_by, self._aborted_on = rows[0]
3026 else:
3027 self._aborted_by = self._aborted_on = None
3028
3029
showardb2e2c322008-10-14 17:33:55 +00003030 def on_pending(self):
3031 """
3032 Called when an entry in a synchronous job has passed verify. If the
showard8cc058f2009-09-08 16:26:33 +00003033 job is ready to run, sets the entries to STARTING. Otherwise, it leaves
3034 them in PENDING.
showardb2e2c322008-10-14 17:33:55 +00003035 """
showard8cc058f2009-09-08 16:26:33 +00003036 self.set_status(models.HostQueueEntry.Status.PENDING)
3037 self.host.set_status(models.Host.Status.PENDING)
showardb000a8d2009-07-28 20:02:07 +00003038
3039 # Some debug code here: sends an email if an asynchronous job does not
3040 # immediately enter Starting.
3041 # TODO: Remove this once we figure out why asynchronous jobs are getting
3042 # stuck in Pending.
showard8cc058f2009-09-08 16:26:33 +00003043 self.job.run_if_ready(queue_entry=self)
3044 if (self.job.synch_count == 1 and
3045 self.status == models.HostQueueEntry.Status.PENDING):
showardb000a8d2009-07-28 20:02:07 +00003046 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
3047 message = 'Asynchronous job stuck in Pending'
3048 email_manager.manager.enqueue_notify_email(subject, message)
showardb2e2c322008-10-14 17:33:55 +00003049
3050
showardd3dc1992009-04-22 21:01:40 +00003051 def abort(self, dispatcher):
3052 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00003053
showardd3dc1992009-04-22 21:01:40 +00003054 Status = models.HostQueueEntry.Status
showard8cc058f2009-09-08 16:26:33 +00003055 if self.status in (Status.GATHERING, Status.PARSING):
showardd3dc1992009-04-22 21:01:40 +00003056 # do nothing; post-job tasks will finish and then mark this entry
3057 # with status "Aborted" and take care of the host
3058 return
3059
showard8cc058f2009-09-08 16:26:33 +00003060 if self.status in (Status.STARTING, Status.PENDING, Status.RUNNING):
3061 assert not dispatcher.get_agents_for_entry(self)
showardd3dc1992009-04-22 21:01:40 +00003062 self.host.set_status(models.Host.Status.READY)
3063 elif self.status == Status.VERIFYING:
showard8cc058f2009-09-08 16:26:33 +00003064 models.SpecialTask.objects.create(
3065 task=models.SpecialTask.Task.CLEANUP,
showard9bb960b2009-11-19 01:02:11 +00003066 host=models.Host.objects.get(id=self.host.id),
3067 requested_by=self.job.owner_model())
showardd3dc1992009-04-22 21:01:40 +00003068
3069 self.set_status(Status.ABORTED)
showardd2014822009-10-12 20:26:58 +00003070 self.job.abort_delay_ready_task()
showard170873e2009-01-07 00:22:26 +00003071
showard8cc058f2009-09-08 16:26:33 +00003072
3073 def get_group_name(self):
3074 atomic_group = self.atomic_group
3075 if not atomic_group:
3076 return ''
3077
3078 # Look at any meta_host and dependency labels and pick the first
3079 # one that also specifies this atomic group. Use that label name
3080 # as the group name if possible (it is more specific).
3081 for label in self.get_labels():
3082 if label.atomic_group_id:
3083 assert label.atomic_group_id == atomic_group.id
3084 return label.name
3085 return atomic_group.name
3086
3087
showard170873e2009-01-07 00:22:26 +00003088 def execution_tag(self):
3089 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00003090 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00003091
3092
showarded2afea2009-07-07 20:54:07 +00003093 def execution_path(self):
3094 return self.execution_tag()
3095
3096
showarda9545c02009-12-18 22:44:26 +00003097 def set_started_on_now(self):
3098 self.update_field('started_on', datetime.datetime.now())
3099
3100
3101 def is_hostless(self):
3102 return (self.host_id is None
3103 and self.meta_host is None
3104 and self.atomic_group_id is None)
3105
3106
mbligh36768f02008-02-22 18:28:33 +00003107class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00003108 _table_name = 'jobs'
3109 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
3110 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00003111 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00003112 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00003113
showard77182562009-06-10 00:16:05 +00003114 # This does not need to be a column in the DB. The delays are likely to
3115 # be configured short. If the scheduler is stopped and restarted in
3116 # the middle of a job's delay cycle, the delay cycle will either be
3117 # repeated or skipped depending on the number of Pending machines found
3118 # when the restarted scheduler recovers to track it. Not a problem.
3119 #
3120 # A reference to the DelayedCallTask that will wake up the job should
3121 # no other HQEs change state in time. Its end_time attribute is used
3122 # by our run_with_ready_delay() method to determine if the wait is over.
3123 _delay_ready_task = None
3124
3125 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
3126 # all status='Pending' atomic group HQEs incase a delay was running when the
3127 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00003128
showarda3c58572009-03-12 20:36:59 +00003129 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00003130 assert id or row
showarda3c58572009-03-12 20:36:59 +00003131 super(Job, self).__init__(id=id, row=row, **kwargs)
showard9bb960b2009-11-19 01:02:11 +00003132 self._owner_model = None # caches model instance of owner
3133
3134
3135 def owner_model(self):
3136 # work around the fact that the Job owner field is a string, not a
3137 # foreign key
3138 if not self._owner_model:
3139 self._owner_model = models.User.objects.get(login=self.owner)
3140 return self._owner_model
mbligh36768f02008-02-22 18:28:33 +00003141
mblighe2586682008-02-29 22:45:46 +00003142
jadmanski0afbb632008-06-06 21:10:57 +00003143 def is_server_job(self):
3144 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00003145
3146
showard170873e2009-01-07 00:22:26 +00003147 def tag(self):
3148 return "%s-%s" % (self.id, self.owner)
3149
3150
jadmanski0afbb632008-06-06 21:10:57 +00003151 def get_host_queue_entries(self):
3152 rows = _db.execute("""
3153 SELECT * FROM host_queue_entries
3154 WHERE job_id= %s
3155 """, (self.id,))
3156 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00003157
jadmanski0afbb632008-06-06 21:10:57 +00003158 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00003159
jadmanski0afbb632008-06-06 21:10:57 +00003160 return entries
mbligh36768f02008-02-22 18:28:33 +00003161
3162
jadmanski0afbb632008-06-06 21:10:57 +00003163 def set_status(self, status, update_queues=False):
3164 self.update_field('status',status)
3165
3166 if update_queues:
3167 for queue_entry in self.get_host_queue_entries():
3168 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00003169
3170
showard77182562009-06-10 00:16:05 +00003171 def _atomic_and_has_started(self):
3172 """
3173 @returns True if any of the HostQueueEntries associated with this job
3174 have entered the Status.STARTING state or beyond.
3175 """
3176 atomic_entries = models.HostQueueEntry.objects.filter(
3177 job=self.id, atomic_group__isnull=False)
3178 if atomic_entries.count() <= 0:
3179 return False
3180
showardaf8b4ca2009-06-16 18:47:26 +00003181 # These states may *only* be reached if Job.run() has been called.
3182 started_statuses = (models.HostQueueEntry.Status.STARTING,
3183 models.HostQueueEntry.Status.RUNNING,
3184 models.HostQueueEntry.Status.COMPLETED)
3185
3186 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00003187 return started_entries.count() > 0
3188
3189
showard708b3522009-08-20 23:26:15 +00003190 def _hosts_assigned_count(self):
3191 """The number of HostQueueEntries assigned a Host for this job."""
3192 entries = models.HostQueueEntry.objects.filter(job=self.id,
3193 host__isnull=False)
3194 return entries.count()
3195
3196
showard77182562009-06-10 00:16:05 +00003197 def _pending_count(self):
3198 """The number of HostQueueEntries for this job in the Pending state."""
3199 pending_entries = models.HostQueueEntry.objects.filter(
3200 job=self.id, status=models.HostQueueEntry.Status.PENDING)
3201 return pending_entries.count()
3202
3203
showardd07a5f32009-12-07 19:36:20 +00003204 def _max_hosts_needed_to_run(self, atomic_group):
showardd2014822009-10-12 20:26:58 +00003205 """
3206 @param atomic_group: The AtomicGroup associated with this job that we
showardd07a5f32009-12-07 19:36:20 +00003207 are using to set an upper bound on the threshold.
3208 @returns The maximum number of HostQueueEntries assigned a Host before
showardd2014822009-10-12 20:26:58 +00003209 this job can run.
3210 """
3211 return min(self._hosts_assigned_count(),
3212 atomic_group.max_number_of_machines)
3213
3214
showardd07a5f32009-12-07 19:36:20 +00003215 def _min_hosts_needed_to_run(self):
3216 """Return the minumum number of hsots needed to run this job."""
3217 return self.synch_count
3218
3219
jadmanski0afbb632008-06-06 21:10:57 +00003220 def is_ready(self):
showard77182562009-06-10 00:16:05 +00003221 # NOTE: Atomic group jobs stop reporting ready after they have been
3222 # started to avoid launching multiple copies of one atomic job.
3223 # Only possible if synch_count is less than than half the number of
3224 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00003225 pending_count = self._pending_count()
3226 atomic_and_has_started = self._atomic_and_has_started()
3227 ready = (pending_count >= self.synch_count
showardd2014822009-10-12 20:26:58 +00003228 and not atomic_and_has_started)
showardb000a8d2009-07-28 20:02:07 +00003229
3230 if not ready:
3231 logging.info(
3232 'Job %s not ready: %s pending, %s required '
3233 '(Atomic and started: %s)',
3234 self, pending_count, self.synch_count,
3235 atomic_and_has_started)
3236
3237 return ready
mbligh36768f02008-02-22 18:28:33 +00003238
3239
jadmanski0afbb632008-06-06 21:10:57 +00003240 def num_machines(self, clause = None):
3241 sql = "job_id=%s" % self.id
3242 if clause:
3243 sql += " AND (%s)" % clause
3244 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00003245
3246
jadmanski0afbb632008-06-06 21:10:57 +00003247 def num_queued(self):
3248 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00003249
3250
jadmanski0afbb632008-06-06 21:10:57 +00003251 def num_active(self):
3252 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00003253
3254
jadmanski0afbb632008-06-06 21:10:57 +00003255 def num_complete(self):
3256 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00003257
3258
jadmanski0afbb632008-06-06 21:10:57 +00003259 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00003260 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00003261
mbligh36768f02008-02-22 18:28:33 +00003262
showard6bb7c292009-01-30 01:44:51 +00003263 def _not_yet_run_entries(self, include_verifying=True):
3264 statuses = [models.HostQueueEntry.Status.QUEUED,
3265 models.HostQueueEntry.Status.PENDING]
3266 if include_verifying:
3267 statuses.append(models.HostQueueEntry.Status.VERIFYING)
3268 return models.HostQueueEntry.objects.filter(job=self.id,
3269 status__in=statuses)
3270
3271
3272 def _stop_all_entries(self):
3273 entries_to_stop = self._not_yet_run_entries(
3274 include_verifying=False)
3275 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00003276 assert not child_entry.complete, (
3277 '%s status=%s, active=%s, complete=%s' %
3278 (child_entry.id, child_entry.status, child_entry.active,
3279 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00003280 if child_entry.status == models.HostQueueEntry.Status.PENDING:
3281 child_entry.host.status = models.Host.Status.READY
3282 child_entry.host.save()
3283 child_entry.status = models.HostQueueEntry.Status.STOPPED
3284 child_entry.save()
3285
showard2bab8f42008-11-12 18:15:22 +00003286 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00003287 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00003288 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00003289 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00003290
3291
jadmanski0afbb632008-06-06 21:10:57 +00003292 def write_to_machines_file(self, queue_entry):
showarda9545c02009-12-18 22:44:26 +00003293 hostname = queue_entry.host.hostname
showard170873e2009-01-07 00:22:26 +00003294 file_path = os.path.join(self.tag(), '.machines')
3295 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003296
3297
showardf1ae3542009-05-11 19:26:02 +00003298 def _next_group_name(self, group_name=''):
3299 """@returns a directory name to use for the next host group results."""
3300 if group_name:
3301 # Sanitize for use as a pathname.
3302 group_name = group_name.replace(os.path.sep, '_')
3303 if group_name.startswith('.'):
3304 group_name = '_' + group_name[1:]
3305 # Add a separator between the group name and 'group%d'.
3306 group_name += '.'
3307 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003308 query = models.HostQueueEntry.objects.filter(
3309 job=self.id).values('execution_subdir').distinct()
3310 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003311 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3312 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003313 if ids:
3314 next_id = max(ids) + 1
3315 else:
3316 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003317 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003318
3319
showarddb502762009-09-09 15:31:20 +00003320 def _write_control_file(self, execution_path):
showard170873e2009-01-07 00:22:26 +00003321 control_path = _drone_manager.attach_file_to_execution(
showarddb502762009-09-09 15:31:20 +00003322 execution_path, self.control_file)
showard170873e2009-01-07 00:22:26 +00003323 return control_path
mbligh36768f02008-02-22 18:28:33 +00003324
showardb2e2c322008-10-14 17:33:55 +00003325
showard2bab8f42008-11-12 18:15:22 +00003326 def get_group_entries(self, queue_entry_from_group):
showard8375ce02009-10-12 20:35:13 +00003327 """
3328 @param queue_entry_from_group: A HostQueueEntry instance to find other
3329 group entries on this job for.
3330
3331 @returns A list of HostQueueEntry objects all executing this job as
3332 part of the same group as the one supplied (having the same
3333 execution_subdir).
3334 """
showard2bab8f42008-11-12 18:15:22 +00003335 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003336 return list(HostQueueEntry.fetch(
3337 where='job_id=%s AND execution_subdir=%s',
3338 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003339
3340
showard8cc058f2009-09-08 16:26:33 +00003341 def get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003342 assert queue_entries
showarddb502762009-09-09 15:31:20 +00003343 execution_path = queue_entries[0].execution_path()
3344 control_path = self._write_control_file(execution_path)
showarda9545c02009-12-18 22:44:26 +00003345 hostnames = ','.join(entry.host.hostname
3346 for entry in queue_entries
3347 if not entry.is_hostless())
mbligh36768f02008-02-22 18:28:33 +00003348
showarddb502762009-09-09 15:31:20 +00003349 execution_tag = queue_entries[0].execution_tag()
showard87ba02a2009-04-20 19:37:32 +00003350 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003351 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003352 ['-P', execution_tag, '-n',
3353 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003354 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003355
jadmanski0afbb632008-06-06 21:10:57 +00003356 if not self.is_server_job():
3357 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003358
showardb2e2c322008-10-14 17:33:55 +00003359 return params
mblighe2586682008-02-29 22:45:46 +00003360
mbligh36768f02008-02-22 18:28:33 +00003361
showardc9ae1782009-01-30 01:42:37 +00003362 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003363 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003364 return True
showard0fc38302008-10-23 00:44:07 +00003365 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showarda9545c02009-12-18 22:44:26 +00003366 return queue_entry.host.dirty
showardc9ae1782009-01-30 01:42:37 +00003367 return False
showard21baa452008-10-21 00:08:39 +00003368
showardc9ae1782009-01-30 01:42:37 +00003369
showard8cc058f2009-09-08 16:26:33 +00003370 def _should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003371 do_not_verify = (queue_entry.host.protection ==
3372 host_protections.Protection.DO_NOT_VERIFY)
3373 if do_not_verify:
3374 return False
3375 return self.run_verify
3376
3377
showard8cc058f2009-09-08 16:26:33 +00003378 def schedule_pre_job_tasks(self, queue_entry):
showard77182562009-06-10 00:16:05 +00003379 """
3380 Get a list of tasks to perform before the host_queue_entry
3381 may be used to run this Job (such as Cleanup & Verify).
3382
3383 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003384 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003385 task in the list calls HostQueueEntry.on_pending(), which
3386 continues the flow of the job.
3387 """
showardc9ae1782009-01-30 01:42:37 +00003388 if self._should_run_cleanup(queue_entry):
showard8cc058f2009-09-08 16:26:33 +00003389 task = models.SpecialTask.Task.CLEANUP
3390 elif self._should_run_verify(queue_entry):
3391 task = models.SpecialTask.Task.VERIFY
3392 else:
3393 queue_entry.on_pending()
3394 return
3395
showard9bb960b2009-11-19 01:02:11 +00003396 queue_entry = models.HostQueueEntry.objects.get(id=queue_entry.id)
showard8cc058f2009-09-08 16:26:33 +00003397 models.SpecialTask.objects.create(
showard9bb960b2009-11-19 01:02:11 +00003398 host=models.Host.objects.get(id=queue_entry.host_id),
3399 queue_entry=queue_entry, task=task)
showard21baa452008-10-21 00:08:39 +00003400
3401
showardf1ae3542009-05-11 19:26:02 +00003402 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003403 if len(queue_entries) == 1:
showarda9545c02009-12-18 22:44:26 +00003404 group_subdir_name = queue_entries[0].host.hostname
showard2bab8f42008-11-12 18:15:22 +00003405 else:
showardf1ae3542009-05-11 19:26:02 +00003406 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003407 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003408 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003409 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003410
3411 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003412 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003413
3414
3415 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003416 """
3417 @returns A tuple containing a list of HostQueueEntry instances to be
3418 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003419 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003420 """
showard77182562009-06-10 00:16:05 +00003421 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003422 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003423 if atomic_group:
3424 num_entries_wanted = atomic_group.max_number_of_machines
3425 else:
3426 num_entries_wanted = self.synch_count
3427 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003428
showardf1ae3542009-05-11 19:26:02 +00003429 if num_entries_wanted > 0:
3430 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003431 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003432 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003433 params=(self.id, include_queue_entry.id)))
3434
3435 # Sort the chosen hosts by hostname before slicing.
3436 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3437 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3438 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3439 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003440
showardf1ae3542009-05-11 19:26:02 +00003441 # Sanity check. We'll only ever be called if this can be met.
showard828fc4c2009-09-14 20:31:00 +00003442 if len(chosen_entries) < self.synch_count:
3443 message = ('job %s got less than %s chosen entries: %s' % (
3444 self.id, self.synch_count, chosen_entries))
3445 logging.error(message)
3446 email_manager.manager.enqueue_notify_email(
3447 'Job not started, too few chosen entries', message)
3448 return []
showardf1ae3542009-05-11 19:26:02 +00003449
showard8cc058f2009-09-08 16:26:33 +00003450 group_name = include_queue_entry.get_group_name()
showardf1ae3542009-05-11 19:26:02 +00003451
3452 self._assign_new_group(chosen_entries, group_name=group_name)
showard8cc058f2009-09-08 16:26:33 +00003453 return chosen_entries
showard2bab8f42008-11-12 18:15:22 +00003454
3455
showard77182562009-06-10 00:16:05 +00003456 def run_if_ready(self, queue_entry):
3457 """
showard8375ce02009-10-12 20:35:13 +00003458 Run this job by kicking its HQEs into status='Starting' if enough
3459 hosts are ready for it to run.
3460
3461 Cleans up by kicking HQEs into status='Stopped' if this Job is not
3462 ready to run.
showard77182562009-06-10 00:16:05 +00003463 """
showardb2e2c322008-10-14 17:33:55 +00003464 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003465 self.stop_if_necessary()
showard8cc058f2009-09-08 16:26:33 +00003466 elif queue_entry.atomic_group:
3467 self.run_with_ready_delay(queue_entry)
3468 else:
3469 self.run(queue_entry)
showard77182562009-06-10 00:16:05 +00003470
3471
3472 def run_with_ready_delay(self, queue_entry):
3473 """
3474 Start a delay to wait for more hosts to enter Pending state before
3475 launching an atomic group job. Once set, the a delay cannot be reset.
3476
3477 @param queue_entry: The HostQueueEntry object to get atomic group
3478 info from and pass to run_if_ready when the delay is up.
3479
3480 @returns An Agent to run the job as appropriate or None if a delay
3481 has already been set.
3482 """
3483 assert queue_entry.job_id == self.id
3484 assert queue_entry.atomic_group
3485 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showardd2014822009-10-12 20:26:58 +00003486 over_max_threshold = (self._pending_count() >=
showardd07a5f32009-12-07 19:36:20 +00003487 self._max_hosts_needed_to_run(queue_entry.atomic_group))
showard77182562009-06-10 00:16:05 +00003488 delay_expired = (self._delay_ready_task and
3489 time.time() >= self._delay_ready_task.end_time)
3490
3491 # Delay is disabled or we already have enough? Do not wait to run.
3492 if not delay or over_max_threshold or delay_expired:
showard8cc058f2009-09-08 16:26:33 +00003493 self.run(queue_entry)
3494 else:
3495 queue_entry.set_status(models.HostQueueEntry.Status.WAITING)
showard77182562009-06-10 00:16:05 +00003496
showard8cc058f2009-09-08 16:26:33 +00003497
showardd07a5f32009-12-07 19:36:20 +00003498 def request_abort(self):
3499 """Request that this Job be aborted on the next scheduler cycle."""
3500 queue_entries = HostQueueEntry.fetch(where="job_id=%s" % self.id)
3501 for hqe in queue_entries:
3502 hqe.update_field('aborted', True)
3503
3504
showard8cc058f2009-09-08 16:26:33 +00003505 def schedule_delayed_callback_task(self, queue_entry):
3506 queue_entry.set_status(models.HostQueueEntry.Status.PENDING)
3507
showard77182562009-06-10 00:16:05 +00003508 if self._delay_ready_task:
3509 return None
3510
showard8cc058f2009-09-08 16:26:33 +00003511 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3512
showard77182562009-06-10 00:16:05 +00003513 def run_job_after_delay():
showardd2014822009-10-12 20:26:58 +00003514 logging.info('Job %s done waiting for extra hosts.', self)
3515 # Check to see if the job is still relevant. It could have aborted
3516 # while we were waiting or hosts could have disappearred, etc.
showardd07a5f32009-12-07 19:36:20 +00003517 if self._pending_count() < self._min_hosts_needed_to_run():
showardd2014822009-10-12 20:26:58 +00003518 logging.info('Job %s had too few Pending hosts after waiting '
3519 'for extras. Not running.', self)
showardd07a5f32009-12-07 19:36:20 +00003520 self.request_abort()
showardd2014822009-10-12 20:26:58 +00003521 return
showard77182562009-06-10 00:16:05 +00003522 return self.run(queue_entry)
3523
showard708b3522009-08-20 23:26:15 +00003524 logging.info('Job %s waiting up to %s seconds for more hosts.',
3525 self.id, delay)
showard77182562009-06-10 00:16:05 +00003526 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3527 callback=run_job_after_delay)
showard8cc058f2009-09-08 16:26:33 +00003528 return self._delay_ready_task
showard77182562009-06-10 00:16:05 +00003529
3530
3531 def run(self, queue_entry):
3532 """
3533 @param queue_entry: The HostQueueEntry instance calling this method.
showard77182562009-06-10 00:16:05 +00003534 """
3535 if queue_entry.atomic_group and self._atomic_and_has_started():
3536 logging.error('Job.run() called on running atomic Job %d '
3537 'with HQE %s.', self.id, queue_entry)
showard8cc058f2009-09-08 16:26:33 +00003538 return
3539 queue_entries = self._choose_group_to_run(queue_entry)
showard828fc4c2009-09-14 20:31:00 +00003540 if queue_entries:
3541 self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00003542
3543
showard8cc058f2009-09-08 16:26:33 +00003544 def _finish_run(self, queue_entries):
showardb2ccdda2008-10-28 20:39:05 +00003545 for queue_entry in queue_entries:
showard8cc058f2009-09-08 16:26:33 +00003546 queue_entry.set_status(models.HostQueueEntry.Status.STARTING)
showardd2014822009-10-12 20:26:58 +00003547 self.abort_delay_ready_task()
3548
3549
3550 def abort_delay_ready_task(self):
3551 """Abort the delayed task associated with this job, if any."""
showard77182562009-06-10 00:16:05 +00003552 if self._delay_ready_task:
3553 # Cancel any pending callback that would try to run again
3554 # as we are already running.
3555 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003556
showardd2014822009-10-12 20:26:58 +00003557
showardb000a8d2009-07-28 20:02:07 +00003558 def __str__(self):
3559 return '%s-%s' % (self.id, self.owner)
3560
3561
mbligh36768f02008-02-22 18:28:33 +00003562if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003563 main()