blob: 82673a1cd5c739e5aed60f4fe81ed4d522195e16 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +000099 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000693 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000694 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000695 self._reverify_remaining_hosts()
696 # reinitialize drones after killing orphaned processes, since they can
697 # leave around files when they die
698 _drone_manager.execute_actions()
699 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000700
showard170873e2009-01-07 00:22:26 +0000701
702 def _register_pidfiles(self):
703 # during recovery we may need to read pidfiles for both running and
704 # parsing entries
705 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000706 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000707 special_tasks = models.SpecialTask.objects.filter(is_active=True)
708 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000709 for pidfile_name in _ALL_PIDFILE_NAMES:
710 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000711 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000712 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000713
714
showarded2afea2009-07-07 20:54:07 +0000715 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
716 run_monitor = PidfileRunMonitor()
717 run_monitor.attach_to_existing_process(execution_path,
718 pidfile_name=pidfile_name)
719 if run_monitor.has_process():
720 orphans.discard(run_monitor.get_process())
721 return run_monitor, '(process %s)' % run_monitor.get_process()
722 return None, 'without process'
723
724
showardd3dc1992009-04-22 21:01:40 +0000725 def _recover_entries_with_status(self, status, orphans, pidfile_name,
726 recover_entries_fn):
727 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000728 for queue_entry in queue_entries:
729 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000730 # synchronous job we've already recovered
731 continue
showardd3dc1992009-04-22 21:01:40 +0000732 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000733 run_monitor, process_string = self._get_recovery_run_monitor(
734 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000735
showarded2afea2009-07-07 20:54:07 +0000736 logging.info('Recovering %s entry %s %s',status.lower(),
737 ', '.join(str(entry) for entry in queue_entries),
738 process_string)
showardd3dc1992009-04-22 21:01:40 +0000739 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000740
741
showard6878e8b2009-07-20 22:37:45 +0000742 def _check_for_remaining_orphan_processes(self, orphans):
743 if not orphans:
744 return
745 subject = 'Unrecovered orphan autoserv processes remain'
746 message = '\n'.join(str(process) for process in orphans)
747 email_manager.manager.enqueue_notify_email(subject, message)
748 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000749
showard170873e2009-01-07 00:22:26 +0000750
showardd3dc1992009-04-22 21:01:40 +0000751 def _recover_running_entries(self, orphans):
752 def recover_entries(job, queue_entries, run_monitor):
753 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000754 queue_task = QueueTask(job=job, queue_entries=queue_entries,
755 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000756 self.add_agent(Agent(tasks=[queue_task],
757 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000758 else:
759 # we could do better, but this retains legacy behavior for now
760 for queue_entry in queue_entries:
761 logging.info('Requeuing running HQE %s since it has no '
762 'process' % queue_entry)
763 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000764
765 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000766 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries)
768
769
770 def _recover_gathering_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000773 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000774 self.add_agent(Agent([gather_task]))
775
776 self._recover_entries_with_status(
777 models.HostQueueEntry.Status.GATHERING,
778 orphans, _CRASHINFO_PID_FILE, recover_entries)
779
780
781 def _recover_parsing_entries(self, orphans):
782 def recover_entries(job, queue_entries, run_monitor):
783 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000784 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000785 self.add_agent(Agent([reparse_task], num_processes=0))
786
787 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
788 orphans, _PARSER_PID_FILE,
789 recover_entries)
790
791
792 def _recover_all_recoverable_entries(self):
793 orphans = _drone_manager.get_orphaned_autoserv_processes()
794 self._recover_running_entries(orphans)
795 self._recover_gathering_entries(orphans)
796 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000797 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000798 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000799
showard97aed502008-11-04 02:01:24 +0000800
showarded2afea2009-07-07 20:54:07 +0000801 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000802 """\
803 Recovers all special tasks that have started running but have not
804 completed.
805 """
806
807 tasks = models.SpecialTask.objects.filter(is_active=True,
808 is_complete=False)
809 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000810 for task in tasks.order_by('-queue_entry__id'):
showarded2afea2009-07-07 20:54:07 +0000811 assert not self.host_has_agent(task.host)
showard2fe3f1d2009-07-06 20:19:11 +0000812
813 host = Host(id=task.host.id)
814 queue_entry = None
815 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000816 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000817
showarded2afea2009-07-07 20:54:07 +0000818 run_monitor, process_string = self._get_recovery_run_monitor(
819 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
820
821 logging.info('Recovering %s %s', task, process_string)
822 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000823
824
showarded2afea2009-07-07 20:54:07 +0000825 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000826 """\
827 Recovers a single special task.
828 """
829 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000830 agent_tasks = self._recover_verify(task, host, queue_entry,
831 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000832 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000833 agent_tasks = self._recover_repair(task, host, queue_entry,
834 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000835 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000836 agent_tasks = self._recover_cleanup(task, host, queue_entry,
837 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000838 else:
839 # Should never happen
840 logging.error(
841 "Special task id %d had invalid task %s", (task.id, task.task))
842
843 self.add_agent(Agent(agent_tasks))
844
845
showarded2afea2009-07-07 20:54:07 +0000846 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000847 """\
848 Recovers a verify task.
849 No associated queue entry: Verify host
850 With associated queue entry: Verify host, and run associated queue
851 entry
852 """
853 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000854 return [VerifyTask(host=host, task=task,
855 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000856 else:
showarded2afea2009-07-07 20:54:07 +0000857 return [VerifyTask(queue_entry=queue_entry, task=task,
858 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000859 SetEntryPendingTask(queue_entry=queue_entry)]
860
861
showarded2afea2009-07-07 20:54:07 +0000862 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000863 """\
864 Recovers a repair task.
865 Always repair host
866 """
showarded2afea2009-07-07 20:54:07 +0000867 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
868 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000869
870
showarded2afea2009-07-07 20:54:07 +0000871 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000872 """\
873 Recovers a cleanup task.
874 No associated queue entry: Clean host
875 With associated queue entry: Clean host, verify host if needed, and
876 run associated queue entry
877 """
878 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000879 return [CleanupTask(host=host, task=task,
880 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000881 else:
882 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000883 task=task,
884 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000885 if queue_entry.job.should_run_verify(queue_entry):
886 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
887 agent_tasks.append(
888 SetEntryPendingTask(queue_entry=queue_entry))
889 return agent_tasks
890
891
showard6af73ad2009-07-28 20:00:58 +0000892 def _requeue_starting_entries(self):
893 # temporary measure until we implement proper recovery of Starting HQEs
894 for entry in HostQueueEntry.fetch(where='status="Starting"'):
895 logging.info('Requeuing "Starting" queue entry %s' % entry)
896 assert not self.get_agents_for_entry(entry)
897 assert entry.host.status == models.Host.Status.PENDING
898 self._reverify_hosts_where('id = %s' % entry.host.id)
899 entry.requeue()
900
901
showard6878e8b2009-07-20 22:37:45 +0000902 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000903 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000904 where='active AND NOT complete AND '
905 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000906
showard2fe3f1d2009-07-06 20:19:11 +0000907 message = '\n'.join(str(entry) for entry in queue_entries
908 if not self.get_agents_for_entry(entry))
909 if message:
910 email_manager.manager.enqueue_notify_email(
911 'Unrecovered active host queue entries exist',
912 message)
showard170873e2009-01-07 00:22:26 +0000913
914
showard1ff7b2e2009-05-15 23:17:18 +0000915 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000916 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000917 task=models.SpecialTask.Task.VERIFY, is_active=False,
918 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000919
showard2fe3f1d2009-07-06 20:19:11 +0000920 for task in tasks:
921 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
922 if host.locked or host.invalid or self.host_has_agent(host):
923 continue
showard6d7b2ff2009-06-10 00:16:47 +0000924
showard2fe3f1d2009-07-06 20:19:11 +0000925 logging.info('Force reverifying host %s', host.hostname)
926 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000927
928
showard170873e2009-01-07 00:22:26 +0000929 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000930 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000931 # should never happen
showarded2afea2009-07-07 20:54:07 +0000932 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000933 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000934 self._reverify_hosts_where(
935 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
936 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000937
938
jadmanski0afbb632008-06-06 21:10:57 +0000939 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000940 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000941 full_where='locked = 0 AND invalid = 0 AND ' + where
942 for host in Host.fetch(where=full_where):
943 if self.host_has_agent(host):
944 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000945 continue
showard170873e2009-01-07 00:22:26 +0000946 if print_message:
showardb18134f2009-03-20 20:52:18 +0000947 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000948 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000949 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _recover_hosts(self):
953 # recover "Repair Failed" hosts
954 message = 'Reverifying dead host %s'
955 self._reverify_hosts_where("status = 'Repair Failed'",
956 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000957
958
showard04c82c52008-05-29 19:38:12 +0000959
showardb95b1bd2008-08-15 18:11:04 +0000960 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000961 # prioritize by job priority, then non-metahost over metahost, then FIFO
962 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000963 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000964 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000965 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000966
967
showard89f84db2009-03-12 20:39:13 +0000968 def _refresh_pending_queue_entries(self):
969 """
970 Lookup the pending HostQueueEntries and call our HostScheduler
971 refresh() method given that list. Return the list.
972
973 @returns A list of pending HostQueueEntries sorted in priority order.
974 """
showard63a34772008-08-18 19:32:50 +0000975 queue_entries = self._get_pending_queue_entries()
976 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000977 return []
showardb95b1bd2008-08-15 18:11:04 +0000978
showard63a34772008-08-18 19:32:50 +0000979 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000980
showard89f84db2009-03-12 20:39:13 +0000981 return queue_entries
982
983
984 def _schedule_atomic_group(self, queue_entry):
985 """
986 Schedule the given queue_entry on an atomic group of hosts.
987
988 Returns immediately if there are insufficient available hosts.
989
990 Creates new HostQueueEntries based off of queue_entry for the
991 scheduled hosts and starts them all running.
992 """
993 # This is a virtual host queue entry representing an entire
994 # atomic group, find a group and schedule their hosts.
995 group_hosts = self._host_scheduler.find_eligible_atomic_group(
996 queue_entry)
997 if not group_hosts:
998 return
showardcbe6f942009-06-17 19:33:49 +0000999
1000 logging.info('Expanding atomic group entry %s with hosts %s',
1001 queue_entry,
1002 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001003 # The first assigned host uses the original HostQueueEntry
1004 group_queue_entries = [queue_entry]
1005 for assigned_host in group_hosts[1:]:
1006 # Create a new HQE for every additional assigned_host.
1007 new_hqe = HostQueueEntry.clone(queue_entry)
1008 new_hqe.save()
1009 group_queue_entries.append(new_hqe)
1010 assert len(group_queue_entries) == len(group_hosts)
1011 for queue_entry, host in itertools.izip(group_queue_entries,
1012 group_hosts):
1013 self._run_queue_entry(queue_entry, host)
1014
1015
1016 def _schedule_new_jobs(self):
1017 queue_entries = self._refresh_pending_queue_entries()
1018 if not queue_entries:
1019 return
1020
showard63a34772008-08-18 19:32:50 +00001021 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001022 if (queue_entry.atomic_group_id is None or
1023 queue_entry.host_id is not None):
1024 assigned_host = self._host_scheduler.find_eligible_host(
1025 queue_entry)
1026 if assigned_host:
1027 self._run_queue_entry(queue_entry, assigned_host)
1028 else:
1029 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001030
1031
1032 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001033 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1034 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001035
1036
jadmanski0afbb632008-06-06 21:10:57 +00001037 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001038 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001039 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001040 for agent in self.get_agents_for_entry(entry):
1041 agent.abort()
1042 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001043
1044
showard324bf812009-01-20 23:23:38 +00001045 def _can_start_agent(self, agent, num_started_this_cycle,
1046 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001047 # always allow zero-process agents to run
1048 if agent.num_processes == 0:
1049 return True
1050 # don't allow any nonzero-process agents to run after we've reached a
1051 # limit (this avoids starvation of many-process agents)
1052 if have_reached_limit:
1053 return False
1054 # total process throttling
showard324bf812009-01-20 23:23:38 +00001055 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001056 return False
1057 # if a single agent exceeds the per-cycle throttling, still allow it to
1058 # run when it's the first agent in the cycle
1059 if num_started_this_cycle == 0:
1060 return True
1061 # per-cycle throttling
1062 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001063 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001064 return False
1065 return True
1066
1067
jadmanski0afbb632008-06-06 21:10:57 +00001068 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001069 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001070 have_reached_limit = False
1071 # iterate over copy, so we can remove agents during iteration
1072 for agent in list(self._agents):
1073 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001074 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001075 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001076 continue
1077 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001078 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001079 have_reached_limit):
1080 have_reached_limit = True
1081 continue
showard4c5374f2008-09-04 17:02:56 +00001082 num_started_this_cycle += agent.num_processes
1083 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001084 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001085 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001086
1087
showard29f7cd22009-04-29 21:16:24 +00001088 def _process_recurring_runs(self):
1089 recurring_runs = models.RecurringRun.objects.filter(
1090 start_date__lte=datetime.datetime.now())
1091 for rrun in recurring_runs:
1092 # Create job from template
1093 job = rrun.job
1094 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001095 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001096
1097 host_objects = info['hosts']
1098 one_time_hosts = info['one_time_hosts']
1099 metahost_objects = info['meta_hosts']
1100 dependencies = info['dependencies']
1101 atomic_group = info['atomic_group']
1102
1103 for host in one_time_hosts or []:
1104 this_host = models.Host.create_one_time_host(host.hostname)
1105 host_objects.append(this_host)
1106
1107 try:
1108 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001109 options=options,
showard29f7cd22009-04-29 21:16:24 +00001110 host_objects=host_objects,
1111 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001112 atomic_group=atomic_group)
1113
1114 except Exception, ex:
1115 logging.exception(ex)
1116 #TODO send email
1117
1118 if rrun.loop_count == 1:
1119 rrun.delete()
1120 else:
1121 if rrun.loop_count != 0: # if not infinite loop
1122 # calculate new start_date
1123 difference = datetime.timedelta(seconds=rrun.loop_period)
1124 rrun.start_date = rrun.start_date + difference
1125 rrun.loop_count -= 1
1126 rrun.save()
1127
1128
showard170873e2009-01-07 00:22:26 +00001129class PidfileRunMonitor(object):
1130 """
1131 Client must call either run() to start a new process or
1132 attach_to_existing_process().
1133 """
mbligh36768f02008-02-22 18:28:33 +00001134
showard170873e2009-01-07 00:22:26 +00001135 class _PidfileException(Exception):
1136 """
1137 Raised when there's some unexpected behavior with the pid file, but only
1138 used internally (never allowed to escape this class).
1139 """
mbligh36768f02008-02-22 18:28:33 +00001140
1141
showard170873e2009-01-07 00:22:26 +00001142 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001143 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001144 self._start_time = None
1145 self.pidfile_id = None
1146 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001147
1148
showard170873e2009-01-07 00:22:26 +00001149 def _add_nice_command(self, command, nice_level):
1150 if not nice_level:
1151 return command
1152 return ['nice', '-n', str(nice_level)] + command
1153
1154
1155 def _set_start_time(self):
1156 self._start_time = time.time()
1157
1158
1159 def run(self, command, working_directory, nice_level=None, log_file=None,
1160 pidfile_name=None, paired_with_pidfile=None):
1161 assert command is not None
1162 if nice_level is not None:
1163 command = ['nice', '-n', str(nice_level)] + command
1164 self._set_start_time()
1165 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001166 command, working_directory, pidfile_name=pidfile_name,
1167 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001168
1169
showarded2afea2009-07-07 20:54:07 +00001170 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001171 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001172 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001173 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001174 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001175 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001176
1177
jadmanski0afbb632008-06-06 21:10:57 +00001178 def kill(self):
showard170873e2009-01-07 00:22:26 +00001179 if self.has_process():
1180 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001181
mbligh36768f02008-02-22 18:28:33 +00001182
showard170873e2009-01-07 00:22:26 +00001183 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001184 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001185 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001186
1187
showard170873e2009-01-07 00:22:26 +00001188 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001189 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001190 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001191 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001192
1193
showard170873e2009-01-07 00:22:26 +00001194 def _read_pidfile(self, use_second_read=False):
1195 assert self.pidfile_id is not None, (
1196 'You must call run() or attach_to_existing_process()')
1197 contents = _drone_manager.get_pidfile_contents(
1198 self.pidfile_id, use_second_read=use_second_read)
1199 if contents.is_invalid():
1200 self._state = drone_manager.PidfileContents()
1201 raise self._PidfileException(contents)
1202 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001203
1204
showard21baa452008-10-21 00:08:39 +00001205 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001206 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1207 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001208 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001209 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001210
1211
1212 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001213 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001214 return
mblighbb421852008-03-11 22:36:16 +00001215
showard21baa452008-10-21 00:08:39 +00001216 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001217
showard170873e2009-01-07 00:22:26 +00001218 if self._state.process is None:
1219 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001220 return
mbligh90a549d2008-03-25 23:52:34 +00001221
showard21baa452008-10-21 00:08:39 +00001222 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001223 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001224 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001225 return
mbligh90a549d2008-03-25 23:52:34 +00001226
showard170873e2009-01-07 00:22:26 +00001227 # pid but no running process - maybe process *just* exited
1228 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001229 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001230 # autoserv exited without writing an exit code
1231 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001232 self._handle_pidfile_error(
1233 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001234
showard21baa452008-10-21 00:08:39 +00001235
1236 def _get_pidfile_info(self):
1237 """\
1238 After completion, self._state will contain:
1239 pid=None, exit_status=None if autoserv has not yet run
1240 pid!=None, exit_status=None if autoserv is running
1241 pid!=None, exit_status!=None if autoserv has completed
1242 """
1243 try:
1244 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001245 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001246 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001247
1248
showard170873e2009-01-07 00:22:26 +00001249 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001250 """\
1251 Called when no pidfile is found or no pid is in the pidfile.
1252 """
showard170873e2009-01-07 00:22:26 +00001253 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001254 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1255 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001256 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001257 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001258
1259
showard35162b02009-03-03 02:17:30 +00001260 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001261 """\
1262 Called when autoserv has exited without writing an exit status,
1263 or we've timed out waiting for autoserv to write a pid to the
1264 pidfile. In either case, we just return failure and the caller
1265 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001266
showard170873e2009-01-07 00:22:26 +00001267 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001268 """
1269 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001270 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001271 self._state.exit_status = 1
1272 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001273
1274
jadmanski0afbb632008-06-06 21:10:57 +00001275 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001276 self._get_pidfile_info()
1277 return self._state.exit_status
1278
1279
1280 def num_tests_failed(self):
1281 self._get_pidfile_info()
1282 assert self._state.num_tests_failed is not None
1283 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001284
1285
mbligh36768f02008-02-22 18:28:33 +00001286class Agent(object):
showard77182562009-06-10 00:16:05 +00001287 """
1288 An agent for use by the Dispatcher class to perform a sequence of tasks.
1289
1290 The following methods are required on all task objects:
1291 poll() - Called periodically to let the task check its status and
1292 update its internal state. If the task succeeded.
1293 is_done() - Returns True if the task is finished.
1294 abort() - Called when an abort has been requested. The task must
1295 set its aborted attribute to True if it actually aborted.
1296
1297 The following attributes are required on all task objects:
1298 aborted - bool, True if this task was aborted.
1299 failure_tasks - A sequence of tasks to be run using a new Agent
1300 by the dispatcher should this task fail.
1301 success - bool, True if this task succeeded.
1302 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1303 host_ids - A sequence of Host ids this task represents.
1304
1305 The following attribute is written to all task objects:
1306 agent - A reference to the Agent instance that the task has been
1307 added to.
1308 """
1309
1310
showard170873e2009-01-07 00:22:26 +00001311 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001312 """
1313 @param tasks: A list of tasks as described in the class docstring.
1314 @param num_processes: The number of subprocesses the Agent represents.
1315 This is used by the Dispatcher for managing the load on the
1316 system. Defaults to 1.
1317 """
jadmanski0afbb632008-06-06 21:10:57 +00001318 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001319 self.queue = None
showard77182562009-06-10 00:16:05 +00001320 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001321 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001322 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001323
showard170873e2009-01-07 00:22:26 +00001324 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1325 for task in tasks)
1326 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1327
showardd3dc1992009-04-22 21:01:40 +00001328 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001329 for task in tasks:
1330 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001331
1332
showardd3dc1992009-04-22 21:01:40 +00001333 def _clear_queue(self):
1334 self.queue = Queue.Queue(0)
1335
1336
showard170873e2009-01-07 00:22:26 +00001337 def _union_ids(self, id_lists):
1338 return set(itertools.chain(*id_lists))
1339
1340
jadmanski0afbb632008-06-06 21:10:57 +00001341 def add_task(self, task):
1342 self.queue.put_nowait(task)
1343 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001344
1345
jadmanski0afbb632008-06-06 21:10:57 +00001346 def tick(self):
showard21baa452008-10-21 00:08:39 +00001347 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001348 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001349 self.active_task.poll()
1350 if not self.active_task.is_done():
1351 return
1352 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001353
1354
jadmanski0afbb632008-06-06 21:10:57 +00001355 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001356 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001357 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001358 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001359 if not self.active_task.success:
1360 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001361 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001362
jadmanski0afbb632008-06-06 21:10:57 +00001363 if not self.is_done():
1364 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001365
1366
jadmanski0afbb632008-06-06 21:10:57 +00001367 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001368 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001369 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1370 # get reset.
1371 new_agent = Agent(self.active_task.failure_tasks)
1372 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001373
mblighe2586682008-02-29 22:45:46 +00001374
showard4c5374f2008-09-04 17:02:56 +00001375 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001376 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001377
1378
jadmanski0afbb632008-06-06 21:10:57 +00001379 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001380 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001381
1382
showardd3dc1992009-04-22 21:01:40 +00001383 def abort(self):
showard08a36412009-05-05 01:01:13 +00001384 # abort tasks until the queue is empty or a task ignores the abort
1385 while not self.is_done():
1386 if not self.active_task:
1387 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001388 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001389 if not self.active_task.aborted:
1390 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001391 return
1392 self.active_task = None
1393
showardd3dc1992009-04-22 21:01:40 +00001394
showard77182562009-06-10 00:16:05 +00001395class DelayedCallTask(object):
1396 """
1397 A task object like AgentTask for an Agent to run that waits for the
1398 specified amount of time to have elapsed before calling the supplied
1399 callback once and finishing. If the callback returns anything, it is
1400 assumed to be a new Agent instance and will be added to the dispatcher.
1401
1402 @attribute end_time: The absolute posix time after which this task will
1403 call its callback when it is polled and be finished.
1404
1405 Also has all attributes required by the Agent class.
1406 """
1407 def __init__(self, delay_seconds, callback, now_func=None):
1408 """
1409 @param delay_seconds: The delay in seconds from now that this task
1410 will call the supplied callback and be done.
1411 @param callback: A callable to be called by this task once after at
1412 least delay_seconds time has elapsed. It must return None
1413 or a new Agent instance.
1414 @param now_func: A time.time like function. Default: time.time.
1415 Used for testing.
1416 """
1417 assert delay_seconds > 0
1418 assert callable(callback)
1419 if not now_func:
1420 now_func = time.time
1421 self._now_func = now_func
1422 self._callback = callback
1423
1424 self.end_time = self._now_func() + delay_seconds
1425
1426 # These attributes are required by Agent.
1427 self.aborted = False
1428 self.failure_tasks = ()
1429 self.host_ids = ()
1430 self.success = False
1431 self.queue_entry_ids = ()
1432 # This is filled in by Agent.add_task().
1433 self.agent = None
1434
1435
1436 def poll(self):
1437 if self._callback and self._now_func() >= self.end_time:
1438 new_agent = self._callback()
1439 if new_agent:
1440 self.agent.dispatcher.add_agent(new_agent)
1441 self._callback = None
1442 self.success = True
1443
1444
1445 def is_done(self):
1446 return not self._callback
1447
1448
1449 def abort(self):
1450 self.aborted = True
1451 self._callback = None
1452
1453
mbligh36768f02008-02-22 18:28:33 +00001454class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001455 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1456 pidfile_name=None, paired_with_pidfile=None,
1457 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001458 self.done = False
1459 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001460 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001461 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001462 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001463 self.monitor = recover_run_monitor
1464 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001465 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001466 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001467 self.queue_entry_ids = []
1468 self.host_ids = []
1469 self.log_file = None
1470
1471
1472 def _set_ids(self, host=None, queue_entries=None):
1473 if queue_entries and queue_entries != [None]:
1474 self.host_ids = [entry.host.id for entry in queue_entries]
1475 self.queue_entry_ids = [entry.id for entry in queue_entries]
1476 else:
1477 assert host
1478 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001479
1480
jadmanski0afbb632008-06-06 21:10:57 +00001481 def poll(self):
showard08a36412009-05-05 01:01:13 +00001482 if not self.started:
1483 self.start()
1484 self.tick()
1485
1486
1487 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001488 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001489 exit_code = self.monitor.exit_code()
1490 if exit_code is None:
1491 return
1492 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001493 else:
1494 success = False
mbligh36768f02008-02-22 18:28:33 +00001495
jadmanski0afbb632008-06-06 21:10:57 +00001496 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001497
1498
jadmanski0afbb632008-06-06 21:10:57 +00001499 def is_done(self):
1500 return self.done
mbligh36768f02008-02-22 18:28:33 +00001501
1502
jadmanski0afbb632008-06-06 21:10:57 +00001503 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001504 if self.done:
1505 return
jadmanski0afbb632008-06-06 21:10:57 +00001506 self.done = True
1507 self.success = success
1508 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001509
1510
jadmanski0afbb632008-06-06 21:10:57 +00001511 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001512 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001513
mbligh36768f02008-02-22 18:28:33 +00001514
jadmanski0afbb632008-06-06 21:10:57 +00001515 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001516 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001517 _drone_manager.copy_to_results_repository(
1518 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001519
1520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def epilog(self):
1522 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001523
1524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def start(self):
1526 assert self.agent
1527
1528 if not self.started:
1529 self.prolog()
1530 self.run()
1531
1532 self.started = True
1533
1534
1535 def abort(self):
1536 if self.monitor:
1537 self.monitor.kill()
1538 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001539 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001540 self.cleanup()
1541
1542
showarded2afea2009-07-07 20:54:07 +00001543 def _get_consistent_execution_path(self, execution_entries):
1544 first_execution_path = execution_entries[0].execution_path()
1545 for execution_entry in execution_entries[1:]:
1546 assert execution_entry.execution_path() == first_execution_path, (
1547 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1548 execution_entry,
1549 first_execution_path,
1550 execution_entries[0]))
1551 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001552
1553
showarded2afea2009-07-07 20:54:07 +00001554 def _copy_results(self, execution_entries, use_monitor=None):
1555 """
1556 @param execution_entries: list of objects with execution_path() method
1557 """
1558 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001559 if use_monitor is None:
1560 assert self.monitor
1561 use_monitor = self.monitor
1562 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001563 execution_path = self._get_consistent_execution_path(execution_entries)
1564 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001565 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001566 results_path)
showardde634ee2009-01-30 01:44:24 +00001567
showarda1e74b32009-05-12 17:32:04 +00001568
1569 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001570 reparse_task = FinalReparseTask(queue_entries)
1571 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1572
1573
showarda1e74b32009-05-12 17:32:04 +00001574 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1575 self._copy_results(queue_entries, use_monitor)
1576 self._parse_results(queue_entries)
1577
1578
showardd3dc1992009-04-22 21:01:40 +00001579 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001580 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001581 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001582 self.monitor = PidfileRunMonitor()
1583 self.monitor.run(self.cmd, self._working_directory,
1584 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001585 log_file=self.log_file,
1586 pidfile_name=pidfile_name,
1587 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001588
1589
showardd9205182009-04-27 20:09:55 +00001590class TaskWithJobKeyvals(object):
1591 """AgentTask mixin providing functionality to help with job keyval files."""
1592 _KEYVAL_FILE = 'keyval'
1593 def _format_keyval(self, key, value):
1594 return '%s=%s' % (key, value)
1595
1596
1597 def _keyval_path(self):
1598 """Subclasses must override this"""
1599 raise NotImplemented
1600
1601
1602 def _write_keyval_after_job(self, field, value):
1603 assert self.monitor
1604 if not self.monitor.has_process():
1605 return
1606 _drone_manager.write_lines_to_file(
1607 self._keyval_path(), [self._format_keyval(field, value)],
1608 paired_with_process=self.monitor.get_process())
1609
1610
1611 def _job_queued_keyval(self, job):
1612 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1613
1614
1615 def _write_job_finished(self):
1616 self._write_keyval_after_job("job_finished", int(time.time()))
1617
1618
showarded2afea2009-07-07 20:54:07 +00001619class SpecialAgentTask(AgentTask):
1620 """
1621 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1622 """
1623
1624 TASK_TYPE = None
1625 host = None
1626 queue_entry = None
1627
1628 def __init__(self, task, extra_command_args, **kwargs):
1629 assert self.host
1630 assert (self.TASK_TYPE is not None,
1631 'self.TASK_TYPE must be overridden')
1632 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001633 if task:
1634 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001635 self._extra_command_args = extra_command_args
1636 super(SpecialAgentTask, self).__init__(**kwargs)
1637
1638
1639 def prolog(self):
1640 super(SpecialAgentTask, self).prolog()
1641 self.task = models.SpecialTask.prepare(self, self.task)
1642 self.cmd = _autoserv_command_line(self.host.hostname,
1643 self._extra_command_args,
1644 queue_entry=self.queue_entry)
1645 self._working_directory = self.task.execution_path()
1646 self.task.activate()
1647
1648
showardb6681aa2009-07-08 21:15:00 +00001649 def cleanup(self):
1650 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001651
1652 # self.task can be None if a SpecialAgentTask is aborted before the
1653 # prolog runs
1654 if self.task:
1655 self.task.finish()
1656
1657 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001658 self._copy_results([self.task])
1659
1660
1661class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1662 TASK_TYPE = models.SpecialTask.Task.REPAIR
1663
1664
1665 def __init__(self, host, queue_entry=None, task=None,
1666 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001667 """\
showard170873e2009-01-07 00:22:26 +00001668 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001669 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001670 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001671 # normalize the protection name
1672 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001673
jadmanski0afbb632008-06-06 21:10:57 +00001674 self.host = host
showardcfd4a7e2009-07-11 01:47:33 +00001675 self.queue_entry = None
1676 # recovery code can pass a HQE that's already been requeued. for a
1677 # metahost, that means the host has been unassigned. in that case,
1678 # ignore the HQE.
1679 hqe_still_assigned_to_this_host = (queue_entry and queue_entry.host
1680 and queue_entry.host.id == host.id)
1681 if hqe_still_assigned_to_this_host:
1682 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001683
showarded2afea2009-07-07 20:54:07 +00001684 super(RepairTask, self).__init__(
1685 task, ['-R', '--host-protection', protection],
1686 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001687
showard2fe3f1d2009-07-06 20:19:11 +00001688 # *don't* include the queue entry in IDs -- if the queue entry is
1689 # aborted, we want to leave the repair task running
1690 self._set_ids(host=host)
1691
mbligh36768f02008-02-22 18:28:33 +00001692
jadmanski0afbb632008-06-06 21:10:57 +00001693 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001694 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001695 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001696 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001697 if self.queue_entry:
1698 self.queue_entry.requeue()
1699
mbligh36768f02008-02-22 18:28:33 +00001700
showardd9205182009-04-27 20:09:55 +00001701 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001702 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001703
1704
showardde634ee2009-01-30 01:44:24 +00001705 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001706 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001707
showard2fe3f1d2009-07-06 20:19:11 +00001708 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001709 return # don't fail metahost entries, they'll be reassigned
1710
showard2fe3f1d2009-07-06 20:19:11 +00001711 self.queue_entry.update_from_database()
1712 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001713 return # entry has been aborted
1714
showard2fe3f1d2009-07-06 20:19:11 +00001715 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001716 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001717 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001718 self._write_keyval_after_job(queued_key, queued_time)
1719 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001720 # copy results logs into the normal place for job results
1721 _drone_manager.copy_results_on_drone(
1722 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001723 source_path=self._working_directory + '/',
1724 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001725
showard2fe3f1d2009-07-06 20:19:11 +00001726 self._copy_results([self.queue_entry])
1727 if self.queue_entry.job.parse_failed_repair:
1728 self._parse_results([self.queue_entry])
1729 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001730
1731
jadmanski0afbb632008-06-06 21:10:57 +00001732 def epilog(self):
1733 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001734
jadmanski0afbb632008-06-06 21:10:57 +00001735 if self.success:
1736 self.host.set_status('Ready')
1737 else:
1738 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001739 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001740 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001741
1742
showarded2afea2009-07-07 20:54:07 +00001743class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001744 def epilog(self):
1745 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001746 should_copy_results = (self.queue_entry and not self.success
1747 and not self.queue_entry.meta_host)
1748 if should_copy_results:
1749 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001750 log_name = os.path.basename(self.task.execution_path())
1751 source = os.path.join(self.task.execution_path(), 'debug',
1752 'autoserv.DEBUG')
1753 destination = os.path.join(self.queue_entry.execution_path(),
1754 log_name)
showard170873e2009-01-07 00:22:26 +00001755 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001756 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001757 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001758
1759
1760class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001761 TASK_TYPE = models.SpecialTask.Task.VERIFY
1762
1763
1764 def __init__(self, queue_entry=None, host=None, task=None,
1765 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001766 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001767 self.host = host or queue_entry.host
1768 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001769
showarde788ea62008-11-17 21:02:47 +00001770 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001771 super(VerifyTask, self).__init__(
1772 task, ['-v'], failure_tasks=failure_tasks,
1773 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001774
showard170873e2009-01-07 00:22:26 +00001775 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001776
1777
jadmanski0afbb632008-06-06 21:10:57 +00001778 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001779 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001780
showardb18134f2009-03-20 20:52:18 +00001781 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001782 if self.queue_entry:
1783 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001784 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001785
showarded2afea2009-07-07 20:54:07 +00001786 # Delete any other queued verifies for this host. One verify will do
1787 # and there's no need to keep records of other requests.
1788 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001789 host__id=self.host.id,
1790 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001791 is_active=False, is_complete=False)
1792 queued_verifies = queued_verifies.exclude(id=self.task.id)
1793 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001794
mbligh36768f02008-02-22 18:28:33 +00001795
jadmanski0afbb632008-06-06 21:10:57 +00001796 def epilog(self):
1797 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001798 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001799 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001800
1801
showardb5626452009-06-30 01:57:28 +00001802class CleanupHostsMixin(object):
1803 def _reboot_hosts(self, job, queue_entries, final_success,
1804 num_tests_failed):
1805 reboot_after = job.reboot_after
1806 do_reboot = (
1807 # always reboot after aborted jobs
1808 self._final_status == models.HostQueueEntry.Status.ABORTED
1809 or reboot_after == models.RebootAfter.ALWAYS
1810 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1811 and final_success and num_tests_failed == 0))
1812
1813 for queue_entry in queue_entries:
1814 if do_reboot:
1815 # don't pass the queue entry to the CleanupTask. if the cleanup
1816 # fails, the job doesn't care -- it's over.
1817 cleanup_task = CleanupTask(host=queue_entry.host)
1818 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1819 else:
1820 queue_entry.host.set_status('Ready')
1821
1822
1823class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001824 def __init__(self, job, queue_entries, cmd=None, group_name='',
1825 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001826 self.job = job
1827 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001828 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001829 super(QueueTask, self).__init__(
1830 cmd, self._execution_path(),
1831 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001832 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001833
1834
showard73ec0442009-02-07 02:05:20 +00001835 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001836 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001837
1838
1839 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1840 keyval_contents = '\n'.join(self._format_keyval(key, value)
1841 for key, value in keyval_dict.iteritems())
1842 # always end with a newline to allow additional keyvals to be written
1843 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001844 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001845 keyval_contents,
1846 file_path=keyval_path)
1847
1848
1849 def _write_keyvals_before_job(self, keyval_dict):
1850 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1851
1852
showard170873e2009-01-07 00:22:26 +00001853 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001854 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001855 host.hostname)
1856 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001857 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1858 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001859
1860
showarded2afea2009-07-07 20:54:07 +00001861 def _execution_path(self):
1862 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001863
1864
jadmanski0afbb632008-06-06 21:10:57 +00001865 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001866 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001867 keyval_dict = {queued_key: queued_time}
1868 if self.group_name:
1869 keyval_dict['host_group_name'] = self.group_name
1870 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001871 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001872 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001873 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001874 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001875 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001876 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001877 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1878 # TODO(gps): Remove this if nothing needs it anymore.
1879 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001880 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001881
1882
showard35162b02009-03-03 02:17:30 +00001883 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001884 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001885 _drone_manager.write_lines_to_file(error_file_path,
1886 [_LOST_PROCESS_ERROR])
1887
1888
showardd3dc1992009-04-22 21:01:40 +00001889 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001890 if not self.monitor:
1891 return
1892
showardd9205182009-04-27 20:09:55 +00001893 self._write_job_finished()
1894
showardd3dc1992009-04-22 21:01:40 +00001895 # both of these conditionals can be true, iff the process ran, wrote a
1896 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001897 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001898 gather_task = GatherLogsTask(self.job, self.queue_entries)
1899 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001900 else:
1901 self._reboot_hosts(self.job, self.queue_entries,
1902 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001903
1904 if self.monitor.lost_process:
1905 self._write_lost_process_error_file()
1906 for queue_entry in self.queue_entries:
1907 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001908
1909
showardcbd74612008-11-19 21:42:02 +00001910 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001911 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001912 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001913 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001914 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001915
1916
jadmanskif7fa2cc2008-10-01 14:13:23 +00001917 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001918 if not self.monitor or not self.monitor.has_process():
1919 return
1920
jadmanskif7fa2cc2008-10-01 14:13:23 +00001921 # build up sets of all the aborted_by and aborted_on values
1922 aborted_by, aborted_on = set(), set()
1923 for queue_entry in self.queue_entries:
1924 if queue_entry.aborted_by:
1925 aborted_by.add(queue_entry.aborted_by)
1926 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1927 aborted_on.add(t)
1928
1929 # extract some actual, unique aborted by value and write it out
1930 assert len(aborted_by) <= 1
1931 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001932 aborted_by_value = aborted_by.pop()
1933 aborted_on_value = max(aborted_on)
1934 else:
1935 aborted_by_value = 'autotest_system'
1936 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001937
showarda0382352009-02-11 23:36:43 +00001938 self._write_keyval_after_job("aborted_by", aborted_by_value)
1939 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001940
showardcbd74612008-11-19 21:42:02 +00001941 aborted_on_string = str(datetime.datetime.fromtimestamp(
1942 aborted_on_value))
1943 self._write_status_comment('Job aborted by %s on %s' %
1944 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001945
1946
jadmanski0afbb632008-06-06 21:10:57 +00001947 def abort(self):
1948 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001949 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001950 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001951
1952
jadmanski0afbb632008-06-06 21:10:57 +00001953 def epilog(self):
1954 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001955 self._finish_task()
1956 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001957
1958
showardd3dc1992009-04-22 21:01:40 +00001959class PostJobTask(AgentTask):
1960 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001961 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001962 self._queue_entries = queue_entries
1963 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001964
showarded2afea2009-07-07 20:54:07 +00001965 self._execution_path = self._get_consistent_execution_path(
1966 queue_entries)
1967 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001968 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001969 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001970 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1971
1972 if _testing_mode:
1973 command = 'true'
1974 else:
1975 command = self._generate_command(self._results_dir)
1976
showarded2afea2009-07-07 20:54:07 +00001977 super(PostJobTask, self).__init__(
1978 cmd=command, working_directory=self._execution_path,
1979 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001980
showarded2afea2009-07-07 20:54:07 +00001981 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001982 self._final_status = self._determine_final_status()
1983
1984
1985 def _generate_command(self, results_dir):
1986 raise NotImplementedError('Subclasses must override this')
1987
1988
1989 def _job_was_aborted(self):
1990 was_aborted = None
1991 for queue_entry in self._queue_entries:
1992 queue_entry.update_from_database()
1993 if was_aborted is None: # first queue entry
1994 was_aborted = bool(queue_entry.aborted)
1995 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1996 email_manager.manager.enqueue_notify_email(
1997 'Inconsistent abort state',
1998 'Queue entries have inconsistent abort state: ' +
1999 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2000 # don't crash here, just assume true
2001 return True
2002 return was_aborted
2003
2004
2005 def _determine_final_status(self):
2006 if self._job_was_aborted():
2007 return models.HostQueueEntry.Status.ABORTED
2008
2009 # we'll use a PidfileRunMonitor to read the autoserv exit status
2010 if self._autoserv_monitor.exit_code() == 0:
2011 return models.HostQueueEntry.Status.COMPLETED
2012 return models.HostQueueEntry.Status.FAILED
2013
2014
2015 def run(self):
showard5add1c82009-05-26 19:27:46 +00002016 # make sure we actually have results to work with.
2017 # this should never happen in normal operation.
2018 if not self._autoserv_monitor.has_process():
2019 email_manager.manager.enqueue_notify_email(
2020 'No results in post-job task',
2021 'No results in post-job task at %s' %
2022 self._autoserv_monitor.pidfile_id)
2023 self.finished(False)
2024 return
2025
2026 super(PostJobTask, self).run(
2027 pidfile_name=self._pidfile_name,
2028 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002029
2030
2031 def _set_all_statuses(self, status):
2032 for queue_entry in self._queue_entries:
2033 queue_entry.set_status(status)
2034
2035
2036 def abort(self):
2037 # override AgentTask.abort() to avoid killing the process and ending
2038 # the task. post-job tasks continue when the job is aborted.
2039 pass
2040
2041
showardb5626452009-06-30 01:57:28 +00002042class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002043 """
2044 Task responsible for
2045 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2046 * copying logs to the results repository
2047 * spawning CleanupTasks for hosts, if necessary
2048 * spawning a FinalReparseTask for the job
2049 """
showarded2afea2009-07-07 20:54:07 +00002050 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002051 self._job = job
2052 super(GatherLogsTask, self).__init__(
2053 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002054 logfile_name='.collect_crashinfo.log',
2055 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002056 self._set_ids(queue_entries=queue_entries)
2057
2058
2059 def _generate_command(self, results_dir):
2060 host_list = ','.join(queue_entry.host.hostname
2061 for queue_entry in self._queue_entries)
2062 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2063 '-r', results_dir]
2064
2065
2066 def prolog(self):
2067 super(GatherLogsTask, self).prolog()
2068 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2069
2070
showardd3dc1992009-04-22 21:01:40 +00002071 def epilog(self):
2072 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002073 if self._autoserv_monitor.has_process():
2074 self._copy_and_parse_results(self._queue_entries,
2075 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002076
2077 final_success = (
2078 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2079 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2080 self._reboot_hosts(self._job, self._queue_entries, final_success,
2081 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002082
2083
showard0bbfc212009-04-29 21:06:13 +00002084 def run(self):
showard597bfd32009-05-08 18:22:50 +00002085 autoserv_exit_code = self._autoserv_monitor.exit_code()
2086 # only run if Autoserv exited due to some signal. if we have no exit
2087 # code, assume something bad (and signal-like) happened.
2088 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002089 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002090 else:
2091 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002092
2093
showard8fe93b52008-11-18 17:53:22 +00002094class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002095 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2096
2097
2098 def __init__(self, host=None, queue_entry=None, task=None,
2099 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002100 assert bool(host) ^ bool(queue_entry)
2101 if queue_entry:
2102 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002103 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002104 self.host = host
showard170873e2009-01-07 00:22:26 +00002105
showarde788ea62008-11-17 21:02:47 +00002106 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002107 super(CleanupTask, self).__init__(
2108 task, ['--cleanup'], failure_tasks=[repair_task],
2109 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002110
2111 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002112
mblighd5c95802008-03-05 00:33:46 +00002113
jadmanski0afbb632008-06-06 21:10:57 +00002114 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002115 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002116 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002117 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002118
mblighd5c95802008-03-05 00:33:46 +00002119
showard21baa452008-10-21 00:08:39 +00002120 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002121 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002122
showard21baa452008-10-21 00:08:39 +00002123 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002124 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002125 self.host.update_field('dirty', 0)
2126
2127
showardd3dc1992009-04-22 21:01:40 +00002128class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002129 _num_running_parses = 0
2130
showarded2afea2009-07-07 20:54:07 +00002131 def __init__(self, queue_entries, recover_run_monitor=None):
2132 super(FinalReparseTask, self).__init__(
2133 queue_entries, pidfile_name=_PARSER_PID_FILE,
2134 logfile_name='.parse.log',
2135 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002136 # don't use _set_ids, since we don't want to set the host_ids
2137 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002138 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002139
showard97aed502008-11-04 02:01:24 +00002140
2141 @classmethod
2142 def _increment_running_parses(cls):
2143 cls._num_running_parses += 1
2144
2145
2146 @classmethod
2147 def _decrement_running_parses(cls):
2148 cls._num_running_parses -= 1
2149
2150
2151 @classmethod
2152 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002153 return (cls._num_running_parses <
2154 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002155
2156
2157 def prolog(self):
2158 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002159 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002160
2161
2162 def epilog(self):
2163 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002164 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002165
2166
showardd3dc1992009-04-22 21:01:40 +00002167 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002168 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002169 results_dir]
showard97aed502008-11-04 02:01:24 +00002170
2171
showard08a36412009-05-05 01:01:13 +00002172 def tick(self):
2173 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002174 # and we can, at which point we revert to default behavior
2175 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002176 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002177 else:
2178 self._try_starting_parse()
2179
2180
2181 def run(self):
2182 # override run() to not actually run unless we can
2183 self._try_starting_parse()
2184
2185
2186 def _try_starting_parse(self):
2187 if not self._can_run_new_parse():
2188 return
showard170873e2009-01-07 00:22:26 +00002189
showard97aed502008-11-04 02:01:24 +00002190 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002191 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002192
showard97aed502008-11-04 02:01:24 +00002193 self._increment_running_parses()
2194 self._parse_started = True
2195
2196
2197 def finished(self, success):
2198 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002199 if self._parse_started:
2200 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002201
2202
showardc9ae1782009-01-30 01:42:37 +00002203class SetEntryPendingTask(AgentTask):
2204 def __init__(self, queue_entry):
2205 super(SetEntryPendingTask, self).__init__(cmd='')
2206 self._queue_entry = queue_entry
2207 self._set_ids(queue_entries=[queue_entry])
2208
2209
2210 def run(self):
2211 agent = self._queue_entry.on_pending()
2212 if agent:
2213 self.agent.dispatcher.add_agent(agent)
2214 self.finished(True)
2215
2216
showarda3c58572009-03-12 20:36:59 +00002217class DBError(Exception):
2218 """Raised by the DBObject constructor when its select fails."""
2219
2220
mbligh36768f02008-02-22 18:28:33 +00002221class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002222 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002223
2224 # Subclasses MUST override these:
2225 _table_name = ''
2226 _fields = ()
2227
showarda3c58572009-03-12 20:36:59 +00002228 # A mapping from (type, id) to the instance of the object for that
2229 # particular id. This prevents us from creating new Job() and Host()
2230 # instances for every HostQueueEntry object that we instantiate as
2231 # multiple HQEs often share the same Job.
2232 _instances_by_type_and_id = weakref.WeakValueDictionary()
2233 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002234
showarda3c58572009-03-12 20:36:59 +00002235
2236 def __new__(cls, id=None, **kwargs):
2237 """
2238 Look to see if we already have an instance for this particular type
2239 and id. If so, use it instead of creating a duplicate instance.
2240 """
2241 if id is not None:
2242 instance = cls._instances_by_type_and_id.get((cls, id))
2243 if instance:
2244 return instance
2245 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2246
2247
2248 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002249 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002250 assert self._table_name, '_table_name must be defined in your class'
2251 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002252 if not new_record:
2253 if self._initialized and not always_query:
2254 return # We've already been initialized.
2255 if id is None:
2256 id = row[0]
2257 # Tell future constructors to use us instead of re-querying while
2258 # this instance is still around.
2259 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002260
showard6ae5ea92009-02-25 00:11:51 +00002261 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002262
jadmanski0afbb632008-06-06 21:10:57 +00002263 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002264
jadmanski0afbb632008-06-06 21:10:57 +00002265 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002266 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002267
showarda3c58572009-03-12 20:36:59 +00002268 if self._initialized:
2269 differences = self._compare_fields_in_row(row)
2270 if differences:
showard7629f142009-03-27 21:02:02 +00002271 logging.warn(
2272 'initialized %s %s instance requery is updating: %s',
2273 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002274 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002275 self._initialized = True
2276
2277
2278 @classmethod
2279 def _clear_instance_cache(cls):
2280 """Used for testing, clear the internal instance cache."""
2281 cls._instances_by_type_and_id.clear()
2282
2283
showardccbd6c52009-03-21 00:10:21 +00002284 def _fetch_row_from_db(self, row_id):
2285 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2286 rows = _db.execute(sql, (row_id,))
2287 if not rows:
showard76e29d12009-04-15 21:53:10 +00002288 raise DBError("row not found (table=%s, row id=%s)"
2289 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002290 return rows[0]
2291
2292
showarda3c58572009-03-12 20:36:59 +00002293 def _assert_row_length(self, row):
2294 assert len(row) == len(self._fields), (
2295 "table = %s, row = %s/%d, fields = %s/%d" % (
2296 self.__table, row, len(row), self._fields, len(self._fields)))
2297
2298
2299 def _compare_fields_in_row(self, row):
2300 """
2301 Given a row as returned by a SELECT query, compare it to our existing
2302 in memory fields.
2303
2304 @param row - A sequence of values corresponding to fields named in
2305 The class attribute _fields.
2306
2307 @returns A dictionary listing the differences keyed by field name
2308 containing tuples of (current_value, row_value).
2309 """
2310 self._assert_row_length(row)
2311 differences = {}
2312 for field, row_value in itertools.izip(self._fields, row):
2313 current_value = getattr(self, field)
2314 if current_value != row_value:
2315 differences[field] = (current_value, row_value)
2316 return differences
showard2bab8f42008-11-12 18:15:22 +00002317
2318
2319 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002320 """
2321 Update our field attributes using a single row returned by SELECT.
2322
2323 @param row - A sequence of values corresponding to fields named in
2324 the class fields list.
2325 """
2326 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002327
showard2bab8f42008-11-12 18:15:22 +00002328 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002329 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002330 setattr(self, field, value)
2331 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002332
showard2bab8f42008-11-12 18:15:22 +00002333 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002334
mblighe2586682008-02-29 22:45:46 +00002335
showardccbd6c52009-03-21 00:10:21 +00002336 def update_from_database(self):
2337 assert self.id is not None
2338 row = self._fetch_row_from_db(self.id)
2339 self._update_fields_from_row(row)
2340
2341
jadmanski0afbb632008-06-06 21:10:57 +00002342 def count(self, where, table = None):
2343 if not table:
2344 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002345
jadmanski0afbb632008-06-06 21:10:57 +00002346 rows = _db.execute("""
2347 SELECT count(*) FROM %s
2348 WHERE %s
2349 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002350
jadmanski0afbb632008-06-06 21:10:57 +00002351 assert len(rows) == 1
2352
2353 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002354
2355
showardd3dc1992009-04-22 21:01:40 +00002356 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002357 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002358
showard2bab8f42008-11-12 18:15:22 +00002359 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002360 return
mbligh36768f02008-02-22 18:28:33 +00002361
mblighf8c624d2008-07-03 16:58:45 +00002362 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002363 _db.execute(query, (value, self.id))
2364
showard2bab8f42008-11-12 18:15:22 +00002365 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002366
2367
jadmanski0afbb632008-06-06 21:10:57 +00002368 def save(self):
2369 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002370 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002371 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002372 values = []
2373 for key in keys:
2374 value = getattr(self, key)
2375 if value is None:
2376 values.append('NULL')
2377 else:
2378 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002379 values_str = ','.join(values)
2380 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2381 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002382 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002383 # Update our id to the one the database just assigned to us.
2384 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002385
2386
jadmanski0afbb632008-06-06 21:10:57 +00002387 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002388 self._instances_by_type_and_id.pop((type(self), id), None)
2389 self._initialized = False
2390 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002391 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2392 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002393
2394
showard63a34772008-08-18 19:32:50 +00002395 @staticmethod
2396 def _prefix_with(string, prefix):
2397 if string:
2398 string = prefix + string
2399 return string
2400
2401
jadmanski0afbb632008-06-06 21:10:57 +00002402 @classmethod
showard989f25d2008-10-01 11:38:11 +00002403 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002404 """
2405 Construct instances of our class based on the given database query.
2406
2407 @yields One class instance for each row fetched.
2408 """
showard63a34772008-08-18 19:32:50 +00002409 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2410 where = cls._prefix_with(where, 'WHERE ')
2411 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002412 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002413 'joins' : joins,
2414 'where' : where,
2415 'order_by' : order_by})
2416 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002417 for row in rows:
2418 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002419
mbligh36768f02008-02-22 18:28:33 +00002420
2421class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002422 _table_name = 'ineligible_host_queues'
2423 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002424
2425
showard89f84db2009-03-12 20:39:13 +00002426class AtomicGroup(DBObject):
2427 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002428 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2429 'invalid')
showard89f84db2009-03-12 20:39:13 +00002430
2431
showard989f25d2008-10-01 11:38:11 +00002432class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002433 _table_name = 'labels'
2434 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002435 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002436
2437
showard6157c632009-07-06 20:19:31 +00002438 def __repr__(self):
2439 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2440 self.name, self.id, self.atomic_group_id)
2441
2442
mbligh36768f02008-02-22 18:28:33 +00002443class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002444 _table_name = 'hosts'
2445 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2446 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2447
2448
jadmanski0afbb632008-06-06 21:10:57 +00002449 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002450 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002451 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002452
2453
showard170873e2009-01-07 00:22:26 +00002454 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002455 """
showard170873e2009-01-07 00:22:26 +00002456 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002457 """
2458 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002459 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002460 FROM labels
2461 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002462 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002463 ORDER BY labels.name
2464 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002465 platform = None
2466 all_labels = []
2467 for label_name, is_platform in rows:
2468 if is_platform:
2469 platform = label_name
2470 all_labels.append(label_name)
2471 return platform, all_labels
2472
2473
showard2fe3f1d2009-07-06 20:19:11 +00002474 def reverify_tasks(self):
2475 cleanup_task = CleanupTask(host=self)
2476 verify_task = VerifyTask(host=self)
2477
showard6d7b2ff2009-06-10 00:16:47 +00002478 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002479 self.set_status('Cleaning')
2480 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002481
2482
showard54c1ea92009-05-20 00:32:58 +00002483 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2484
2485
2486 @classmethod
2487 def cmp_for_sort(cls, a, b):
2488 """
2489 A comparison function for sorting Host objects by hostname.
2490
2491 This strips any trailing numeric digits, ignores leading 0s and
2492 compares hostnames by the leading name and the trailing digits as a
2493 number. If both hostnames do not match this pattern, they are simply
2494 compared as lower case strings.
2495
2496 Example of how hostnames will be sorted:
2497
2498 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2499
2500 This hopefully satisfy most people's hostname sorting needs regardless
2501 of their exact naming schemes. Nobody sane should have both a host10
2502 and host010 (but the algorithm works regardless).
2503 """
2504 lower_a = a.hostname.lower()
2505 lower_b = b.hostname.lower()
2506 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2507 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2508 if match_a and match_b:
2509 name_a, number_a_str = match_a.groups()
2510 name_b, number_b_str = match_b.groups()
2511 number_a = int(number_a_str.lstrip('0'))
2512 number_b = int(number_b_str.lstrip('0'))
2513 result = cmp((name_a, number_a), (name_b, number_b))
2514 if result == 0 and lower_a != lower_b:
2515 # If they compared equal above but the lower case names are
2516 # indeed different, don't report equality. abc012 != abc12.
2517 return cmp(lower_a, lower_b)
2518 return result
2519 else:
2520 return cmp(lower_a, lower_b)
2521
2522
mbligh36768f02008-02-22 18:28:33 +00002523class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002524 _table_name = 'host_queue_entries'
2525 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002526 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002527 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002528
2529
showarda3c58572009-03-12 20:36:59 +00002530 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002531 assert id or row
showarda3c58572009-03-12 20:36:59 +00002532 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002533 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002534
jadmanski0afbb632008-06-06 21:10:57 +00002535 if self.host_id:
2536 self.host = Host(self.host_id)
2537 else:
2538 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002539
showard77182562009-06-10 00:16:05 +00002540 if self.atomic_group_id:
2541 self.atomic_group = AtomicGroup(self.atomic_group_id,
2542 always_query=False)
2543 else:
2544 self.atomic_group = None
2545
showard170873e2009-01-07 00:22:26 +00002546 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002547 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002548
2549
showard89f84db2009-03-12 20:39:13 +00002550 @classmethod
2551 def clone(cls, template):
2552 """
2553 Creates a new row using the values from a template instance.
2554
2555 The new instance will not exist in the database or have a valid
2556 id attribute until its save() method is called.
2557 """
2558 assert isinstance(template, cls)
2559 new_row = [getattr(template, field) for field in cls._fields]
2560 clone = cls(row=new_row, new_record=True)
2561 clone.id = None
2562 return clone
2563
2564
showardc85c21b2008-11-24 22:17:37 +00002565 def _view_job_url(self):
2566 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2567
2568
showardf1ae3542009-05-11 19:26:02 +00002569 def get_labels(self):
2570 """
2571 Get all labels associated with this host queue entry (either via the
2572 meta_host or as a job dependency label). The labels yielded are not
2573 guaranteed to be unique.
2574
2575 @yields Label instances associated with this host_queue_entry.
2576 """
2577 if self.meta_host:
2578 yield Label(id=self.meta_host, always_query=False)
2579 labels = Label.fetch(
2580 joins="JOIN jobs_dependency_labels AS deps "
2581 "ON (labels.id = deps.label_id)",
2582 where="deps.job_id = %d" % self.job.id)
2583 for label in labels:
2584 yield label
2585
2586
jadmanski0afbb632008-06-06 21:10:57 +00002587 def set_host(self, host):
2588 if host:
2589 self.queue_log_record('Assigning host ' + host.hostname)
2590 self.update_field('host_id', host.id)
2591 self.update_field('active', True)
2592 self.block_host(host.id)
2593 else:
2594 self.queue_log_record('Releasing host')
2595 self.unblock_host(self.host.id)
2596 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002597
jadmanski0afbb632008-06-06 21:10:57 +00002598 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002599
2600
jadmanski0afbb632008-06-06 21:10:57 +00002601 def get_host(self):
2602 return self.host
mbligh36768f02008-02-22 18:28:33 +00002603
2604
jadmanski0afbb632008-06-06 21:10:57 +00002605 def queue_log_record(self, log_line):
2606 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002607 _drone_manager.write_lines_to_file(self.queue_log_path,
2608 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002609
2610
jadmanski0afbb632008-06-06 21:10:57 +00002611 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002612 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002613 row = [0, self.job.id, host_id]
2614 block = IneligibleHostQueue(row=row, new_record=True)
2615 block.save()
mblighe2586682008-02-29 22:45:46 +00002616
2617
jadmanski0afbb632008-06-06 21:10:57 +00002618 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002619 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002620 blocks = IneligibleHostQueue.fetch(
2621 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2622 for block in blocks:
2623 block.delete()
mblighe2586682008-02-29 22:45:46 +00002624
2625
showard2bab8f42008-11-12 18:15:22 +00002626 def set_execution_subdir(self, subdir=None):
2627 if subdir is None:
2628 assert self.get_host()
2629 subdir = self.get_host().hostname
2630 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002631
2632
showard6355f6b2008-12-05 18:52:13 +00002633 def _get_hostname(self):
2634 if self.host:
2635 return self.host.hostname
2636 return 'no host'
2637
2638
showard170873e2009-01-07 00:22:26 +00002639 def __str__(self):
2640 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2641
2642
jadmanski0afbb632008-06-06 21:10:57 +00002643 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002644 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002645
showardb18134f2009-03-20 20:52:18 +00002646 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002647
showardc85c21b2008-11-24 22:17:37 +00002648 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002649 self.update_field('complete', False)
2650 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002651
jadmanski0afbb632008-06-06 21:10:57 +00002652 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002653 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002654 self.update_field('complete', False)
2655 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002656
showardc85c21b2008-11-24 22:17:37 +00002657 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002658 self.update_field('complete', True)
2659 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002660
2661 should_email_status = (status.lower() in _notify_email_statuses or
2662 'all' in _notify_email_statuses)
2663 if should_email_status:
2664 self._email_on_status(status)
2665
2666 self._email_on_job_complete()
2667
2668
2669 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002670 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002671
2672 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2673 self.job.id, self.job.name, hostname, status)
2674 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2675 self.job.id, self.job.name, hostname, status,
2676 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002677 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002678
2679
2680 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002681 if not self.job.is_finished():
2682 return
showard542e8402008-09-19 20:16:18 +00002683
showardc85c21b2008-11-24 22:17:37 +00002684 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002685 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002686 for queue_entry in hosts_queue:
2687 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002688 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002689 queue_entry.status))
2690
2691 summary_text = "\n".join(summary_text)
2692 status_counts = models.Job.objects.get_status_counts(
2693 [self.job.id])[self.job.id]
2694 status = ', '.join('%d %s' % (count, status) for status, count
2695 in status_counts.iteritems())
2696
2697 subject = 'Autotest: Job ID: %s "%s" %s' % (
2698 self.job.id, self.job.name, status)
2699 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2700 self.job.id, self.job.name, status, self._view_job_url(),
2701 summary_text)
showard170873e2009-01-07 00:22:26 +00002702 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002703
2704
showard77182562009-06-10 00:16:05 +00002705 def run_pre_job_tasks(self, assigned_host=None):
2706 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002707 assert assigned_host
2708 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002709 if self.host_id is None:
2710 self.set_host(assigned_host)
2711 else:
2712 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002713
showardcfd4a7e2009-07-11 01:47:33 +00002714 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002715 self.job.name, self.meta_host, self.atomic_group_id,
2716 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002717
showard77182562009-06-10 00:16:05 +00002718 return self._do_run_pre_job_tasks()
2719
2720
2721 def _do_run_pre_job_tasks(self):
2722 # Every host goes thru the Verifying stage (which may or may not
2723 # actually do anything as determined by get_pre_job_tasks).
2724 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2725
2726 # The pre job tasks always end with a SetEntryPendingTask which
2727 # will continue as appropriate through queue_entry.on_pending().
2728 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002729
showard6ae5ea92009-02-25 00:11:51 +00002730
jadmanski0afbb632008-06-06 21:10:57 +00002731 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002732 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002733 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002734 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002735 # verify/cleanup failure sets the execution subdir, so reset it here
2736 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002737 if self.meta_host:
2738 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002739
2740
jadmanski0afbb632008-06-06 21:10:57 +00002741 def handle_host_failure(self):
2742 """\
2743 Called when this queue entry's host has failed verification and
2744 repair.
2745 """
2746 assert not self.meta_host
2747 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002748 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002749
2750
jadmanskif7fa2cc2008-10-01 14:13:23 +00002751 @property
2752 def aborted_by(self):
2753 self._load_abort_info()
2754 return self._aborted_by
2755
2756
2757 @property
2758 def aborted_on(self):
2759 self._load_abort_info()
2760 return self._aborted_on
2761
2762
2763 def _load_abort_info(self):
2764 """ Fetch info about who aborted the job. """
2765 if hasattr(self, "_aborted_by"):
2766 return
2767 rows = _db.execute("""
2768 SELECT users.login, aborted_host_queue_entries.aborted_on
2769 FROM aborted_host_queue_entries
2770 INNER JOIN users
2771 ON users.id = aborted_host_queue_entries.aborted_by_id
2772 WHERE aborted_host_queue_entries.queue_entry_id = %s
2773 """, (self.id,))
2774 if rows:
2775 self._aborted_by, self._aborted_on = rows[0]
2776 else:
2777 self._aborted_by = self._aborted_on = None
2778
2779
showardb2e2c322008-10-14 17:33:55 +00002780 def on_pending(self):
2781 """
2782 Called when an entry in a synchronous job has passed verify. If the
2783 job is ready to run, returns an agent to run the job. Returns None
2784 otherwise.
2785 """
2786 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002787 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002788
2789 # Some debug code here: sends an email if an asynchronous job does not
2790 # immediately enter Starting.
2791 # TODO: Remove this once we figure out why asynchronous jobs are getting
2792 # stuck in Pending.
2793 agent = self.job.run_if_ready(queue_entry=self)
2794 if self.job.synch_count == 1 and agent is None:
2795 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2796 message = 'Asynchronous job stuck in Pending'
2797 email_manager.manager.enqueue_notify_email(subject, message)
2798 return agent
showardb2e2c322008-10-14 17:33:55 +00002799
2800
showardd3dc1992009-04-22 21:01:40 +00002801 def abort(self, dispatcher):
2802 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002803
showardd3dc1992009-04-22 21:01:40 +00002804 Status = models.HostQueueEntry.Status
2805 has_running_job_agent = (
2806 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2807 and dispatcher.get_agents_for_entry(self))
2808 if has_running_job_agent:
2809 # do nothing; post-job tasks will finish and then mark this entry
2810 # with status "Aborted" and take care of the host
2811 return
2812
2813 if self.status in (Status.STARTING, Status.PENDING):
2814 self.host.set_status(models.Host.Status.READY)
2815 elif self.status == Status.VERIFYING:
2816 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2817
2818 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002819
2820 def execution_tag(self):
2821 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002822 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002823
2824
showarded2afea2009-07-07 20:54:07 +00002825 def execution_path(self):
2826 return self.execution_tag()
2827
2828
mbligh36768f02008-02-22 18:28:33 +00002829class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002830 _table_name = 'jobs'
2831 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2832 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002833 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002834 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002835
showard77182562009-06-10 00:16:05 +00002836 # This does not need to be a column in the DB. The delays are likely to
2837 # be configured short. If the scheduler is stopped and restarted in
2838 # the middle of a job's delay cycle, the delay cycle will either be
2839 # repeated or skipped depending on the number of Pending machines found
2840 # when the restarted scheduler recovers to track it. Not a problem.
2841 #
2842 # A reference to the DelayedCallTask that will wake up the job should
2843 # no other HQEs change state in time. Its end_time attribute is used
2844 # by our run_with_ready_delay() method to determine if the wait is over.
2845 _delay_ready_task = None
2846
2847 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2848 # all status='Pending' atomic group HQEs incase a delay was running when the
2849 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002850
showarda3c58572009-03-12 20:36:59 +00002851 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002852 assert id or row
showarda3c58572009-03-12 20:36:59 +00002853 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002854
mblighe2586682008-02-29 22:45:46 +00002855
jadmanski0afbb632008-06-06 21:10:57 +00002856 def is_server_job(self):
2857 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002858
2859
showard170873e2009-01-07 00:22:26 +00002860 def tag(self):
2861 return "%s-%s" % (self.id, self.owner)
2862
2863
jadmanski0afbb632008-06-06 21:10:57 +00002864 def get_host_queue_entries(self):
2865 rows = _db.execute("""
2866 SELECT * FROM host_queue_entries
2867 WHERE job_id= %s
2868 """, (self.id,))
2869 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002870
jadmanski0afbb632008-06-06 21:10:57 +00002871 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002872
jadmanski0afbb632008-06-06 21:10:57 +00002873 return entries
mbligh36768f02008-02-22 18:28:33 +00002874
2875
jadmanski0afbb632008-06-06 21:10:57 +00002876 def set_status(self, status, update_queues=False):
2877 self.update_field('status',status)
2878
2879 if update_queues:
2880 for queue_entry in self.get_host_queue_entries():
2881 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002882
2883
showard77182562009-06-10 00:16:05 +00002884 def _atomic_and_has_started(self):
2885 """
2886 @returns True if any of the HostQueueEntries associated with this job
2887 have entered the Status.STARTING state or beyond.
2888 """
2889 atomic_entries = models.HostQueueEntry.objects.filter(
2890 job=self.id, atomic_group__isnull=False)
2891 if atomic_entries.count() <= 0:
2892 return False
2893
showardaf8b4ca2009-06-16 18:47:26 +00002894 # These states may *only* be reached if Job.run() has been called.
2895 started_statuses = (models.HostQueueEntry.Status.STARTING,
2896 models.HostQueueEntry.Status.RUNNING,
2897 models.HostQueueEntry.Status.COMPLETED)
2898
2899 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002900 return started_entries.count() > 0
2901
2902
2903 def _pending_count(self):
2904 """The number of HostQueueEntries for this job in the Pending state."""
2905 pending_entries = models.HostQueueEntry.objects.filter(
2906 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2907 return pending_entries.count()
2908
2909
jadmanski0afbb632008-06-06 21:10:57 +00002910 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002911 # NOTE: Atomic group jobs stop reporting ready after they have been
2912 # started to avoid launching multiple copies of one atomic job.
2913 # Only possible if synch_count is less than than half the number of
2914 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002915 pending_count = self._pending_count()
2916 atomic_and_has_started = self._atomic_and_has_started()
2917 ready = (pending_count >= self.synch_count
2918 and not self._atomic_and_has_started())
2919
2920 if not ready:
2921 logging.info(
2922 'Job %s not ready: %s pending, %s required '
2923 '(Atomic and started: %s)',
2924 self, pending_count, self.synch_count,
2925 atomic_and_has_started)
2926
2927 return ready
mbligh36768f02008-02-22 18:28:33 +00002928
2929
jadmanski0afbb632008-06-06 21:10:57 +00002930 def num_machines(self, clause = None):
2931 sql = "job_id=%s" % self.id
2932 if clause:
2933 sql += " AND (%s)" % clause
2934 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002935
2936
jadmanski0afbb632008-06-06 21:10:57 +00002937 def num_queued(self):
2938 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002939
2940
jadmanski0afbb632008-06-06 21:10:57 +00002941 def num_active(self):
2942 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002943
2944
jadmanski0afbb632008-06-06 21:10:57 +00002945 def num_complete(self):
2946 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002947
2948
jadmanski0afbb632008-06-06 21:10:57 +00002949 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002950 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002951
mbligh36768f02008-02-22 18:28:33 +00002952
showard6bb7c292009-01-30 01:44:51 +00002953 def _not_yet_run_entries(self, include_verifying=True):
2954 statuses = [models.HostQueueEntry.Status.QUEUED,
2955 models.HostQueueEntry.Status.PENDING]
2956 if include_verifying:
2957 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2958 return models.HostQueueEntry.objects.filter(job=self.id,
2959 status__in=statuses)
2960
2961
2962 def _stop_all_entries(self):
2963 entries_to_stop = self._not_yet_run_entries(
2964 include_verifying=False)
2965 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002966 assert not child_entry.complete, (
2967 '%s status=%s, active=%s, complete=%s' %
2968 (child_entry.id, child_entry.status, child_entry.active,
2969 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002970 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2971 child_entry.host.status = models.Host.Status.READY
2972 child_entry.host.save()
2973 child_entry.status = models.HostQueueEntry.Status.STOPPED
2974 child_entry.save()
2975
showard2bab8f42008-11-12 18:15:22 +00002976 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002977 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002978 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002979 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002980
2981
jadmanski0afbb632008-06-06 21:10:57 +00002982 def write_to_machines_file(self, queue_entry):
2983 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002984 file_path = os.path.join(self.tag(), '.machines')
2985 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002986
2987
showardf1ae3542009-05-11 19:26:02 +00002988 def _next_group_name(self, group_name=''):
2989 """@returns a directory name to use for the next host group results."""
2990 if group_name:
2991 # Sanitize for use as a pathname.
2992 group_name = group_name.replace(os.path.sep, '_')
2993 if group_name.startswith('.'):
2994 group_name = '_' + group_name[1:]
2995 # Add a separator between the group name and 'group%d'.
2996 group_name += '.'
2997 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002998 query = models.HostQueueEntry.objects.filter(
2999 job=self.id).values('execution_subdir').distinct()
3000 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003001 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3002 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003003 if ids:
3004 next_id = max(ids) + 1
3005 else:
3006 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003007 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003008
3009
showard170873e2009-01-07 00:22:26 +00003010 def _write_control_file(self, execution_tag):
3011 control_path = _drone_manager.attach_file_to_execution(
3012 execution_tag, self.control_file)
3013 return control_path
mbligh36768f02008-02-22 18:28:33 +00003014
showardb2e2c322008-10-14 17:33:55 +00003015
showard2bab8f42008-11-12 18:15:22 +00003016 def get_group_entries(self, queue_entry_from_group):
3017 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003018 return list(HostQueueEntry.fetch(
3019 where='job_id=%s AND execution_subdir=%s',
3020 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003021
3022
showardb2e2c322008-10-14 17:33:55 +00003023 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003024 assert queue_entries
3025 execution_tag = queue_entries[0].execution_tag()
3026 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003027 hostnames = ','.join([entry.get_host().hostname
3028 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003029
showard87ba02a2009-04-20 19:37:32 +00003030 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003031 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003032 ['-P', execution_tag, '-n',
3033 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003034 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003035
jadmanski0afbb632008-06-06 21:10:57 +00003036 if not self.is_server_job():
3037 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003038
showardb2e2c322008-10-14 17:33:55 +00003039 return params
mblighe2586682008-02-29 22:45:46 +00003040
mbligh36768f02008-02-22 18:28:33 +00003041
showardc9ae1782009-01-30 01:42:37 +00003042 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003043 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003044 return True
showard0fc38302008-10-23 00:44:07 +00003045 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003046 return queue_entry.get_host().dirty
3047 return False
showard21baa452008-10-21 00:08:39 +00003048
showardc9ae1782009-01-30 01:42:37 +00003049
showard2fe3f1d2009-07-06 20:19:11 +00003050 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003051 do_not_verify = (queue_entry.host.protection ==
3052 host_protections.Protection.DO_NOT_VERIFY)
3053 if do_not_verify:
3054 return False
3055 return self.run_verify
3056
3057
showard77182562009-06-10 00:16:05 +00003058 def get_pre_job_tasks(self, queue_entry):
3059 """
3060 Get a list of tasks to perform before the host_queue_entry
3061 may be used to run this Job (such as Cleanup & Verify).
3062
3063 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003064 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003065 task in the list calls HostQueueEntry.on_pending(), which
3066 continues the flow of the job.
3067 """
showard21baa452008-10-21 00:08:39 +00003068 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003069 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003070 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003071 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003072 tasks.append(VerifyTask(queue_entry=queue_entry))
3073 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003074 return tasks
3075
3076
showardf1ae3542009-05-11 19:26:02 +00003077 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003078 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003079 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003080 else:
showardf1ae3542009-05-11 19:26:02 +00003081 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003082 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003083 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003084 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003085
3086 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003087 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003088
3089
3090 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003091 """
3092 @returns A tuple containing a list of HostQueueEntry instances to be
3093 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003094 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003095 """
showard77182562009-06-10 00:16:05 +00003096 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003097 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003098 if atomic_group:
3099 num_entries_wanted = atomic_group.max_number_of_machines
3100 else:
3101 num_entries_wanted = self.synch_count
3102 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003103
showardf1ae3542009-05-11 19:26:02 +00003104 if num_entries_wanted > 0:
3105 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003106 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003107 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003108 params=(self.id, include_queue_entry.id)))
3109
3110 # Sort the chosen hosts by hostname before slicing.
3111 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3112 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3113 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3114 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003115
showardf1ae3542009-05-11 19:26:02 +00003116 # Sanity check. We'll only ever be called if this can be met.
3117 assert len(chosen_entries) >= self.synch_count
3118
3119 if atomic_group:
3120 # Look at any meta_host and dependency labels and pick the first
3121 # one that also specifies this atomic group. Use that label name
3122 # as the group name if possible (it is more specific).
3123 group_name = atomic_group.name
3124 for label in include_queue_entry.get_labels():
3125 if label.atomic_group_id:
3126 assert label.atomic_group_id == atomic_group.id
3127 group_name = label.name
3128 break
3129 else:
3130 group_name = ''
3131
3132 self._assign_new_group(chosen_entries, group_name=group_name)
3133 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003134
3135
showard77182562009-06-10 00:16:05 +00003136 def run_if_ready(self, queue_entry):
3137 """
3138 @returns An Agent instance to ultimately run this job if enough hosts
3139 are ready for it to run.
3140 @returns None and potentially cleans up excess hosts if this Job
3141 is not ready to run.
3142 """
showardb2e2c322008-10-14 17:33:55 +00003143 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003144 self.stop_if_necessary()
3145 return None
mbligh36768f02008-02-22 18:28:33 +00003146
showard77182562009-06-10 00:16:05 +00003147 if queue_entry.atomic_group:
3148 return self.run_with_ready_delay(queue_entry)
3149
3150 return self.run(queue_entry)
3151
3152
3153 def run_with_ready_delay(self, queue_entry):
3154 """
3155 Start a delay to wait for more hosts to enter Pending state before
3156 launching an atomic group job. Once set, the a delay cannot be reset.
3157
3158 @param queue_entry: The HostQueueEntry object to get atomic group
3159 info from and pass to run_if_ready when the delay is up.
3160
3161 @returns An Agent to run the job as appropriate or None if a delay
3162 has already been set.
3163 """
3164 assert queue_entry.job_id == self.id
3165 assert queue_entry.atomic_group
3166 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3167 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3168 over_max_threshold = (self._pending_count() >= pending_threshold)
3169 delay_expired = (self._delay_ready_task and
3170 time.time() >= self._delay_ready_task.end_time)
3171
3172 # Delay is disabled or we already have enough? Do not wait to run.
3173 if not delay or over_max_threshold or delay_expired:
3174 return self.run(queue_entry)
3175
3176 # A delay was previously scheduled.
3177 if self._delay_ready_task:
3178 return None
3179
3180 def run_job_after_delay():
3181 logging.info('Job %s done waiting for extra hosts.', self.id)
3182 return self.run(queue_entry)
3183
3184 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3185 callback=run_job_after_delay)
3186
3187 return Agent([self._delay_ready_task], num_processes=0)
3188
3189
3190 def run(self, queue_entry):
3191 """
3192 @param queue_entry: The HostQueueEntry instance calling this method.
3193 @returns An Agent instance to run this job or None if we've already
3194 been run.
3195 """
3196 if queue_entry.atomic_group and self._atomic_and_has_started():
3197 logging.error('Job.run() called on running atomic Job %d '
3198 'with HQE %s.', self.id, queue_entry)
3199 return None
showardf1ae3542009-05-11 19:26:02 +00003200 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3201 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003202
3203
showardf1ae3542009-05-11 19:26:02 +00003204 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003205 for queue_entry in queue_entries:
3206 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003207 params = self._get_autoserv_params(queue_entries)
3208 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003209 cmd=params, group_name=group_name)
3210 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003211 if self._delay_ready_task:
3212 # Cancel any pending callback that would try to run again
3213 # as we are already running.
3214 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003215
showard170873e2009-01-07 00:22:26 +00003216 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003217
3218
showardb000a8d2009-07-28 20:02:07 +00003219 def __str__(self):
3220 return '%s-%s' % (self.id, self.owner)
3221
3222
mbligh36768f02008-02-22 18:28:33 +00003223if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003224 main()