blob: 88f91646f6e7b6e83e7d04607959b84924da1042 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +000099 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000693 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000694 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000695 self._reverify_remaining_hosts()
696 # reinitialize drones after killing orphaned processes, since they can
697 # leave around files when they die
698 _drone_manager.execute_actions()
699 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000700
showard170873e2009-01-07 00:22:26 +0000701
702 def _register_pidfiles(self):
703 # during recovery we may need to read pidfiles for both running and
704 # parsing entries
705 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000706 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000707 special_tasks = models.SpecialTask.objects.filter(is_active=True)
708 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000709 for pidfile_name in _ALL_PIDFILE_NAMES:
710 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000711 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000712 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000713
714
showarded2afea2009-07-07 20:54:07 +0000715 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
716 run_monitor = PidfileRunMonitor()
717 run_monitor.attach_to_existing_process(execution_path,
718 pidfile_name=pidfile_name)
719 if run_monitor.has_process():
720 orphans.discard(run_monitor.get_process())
721 return run_monitor, '(process %s)' % run_monitor.get_process()
722 return None, 'without process'
723
724
showardd3dc1992009-04-22 21:01:40 +0000725 def _recover_entries_with_status(self, status, orphans, pidfile_name,
726 recover_entries_fn):
727 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000728 for queue_entry in queue_entries:
729 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000730 # synchronous job we've already recovered
731 continue
showardd3dc1992009-04-22 21:01:40 +0000732 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000733 run_monitor, process_string = self._get_recovery_run_monitor(
734 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000735
showarded2afea2009-07-07 20:54:07 +0000736 logging.info('Recovering %s entry %s %s',status.lower(),
737 ', '.join(str(entry) for entry in queue_entries),
738 process_string)
showardd3dc1992009-04-22 21:01:40 +0000739 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000740
741
showard6878e8b2009-07-20 22:37:45 +0000742 def _check_for_remaining_orphan_processes(self, orphans):
743 if not orphans:
744 return
745 subject = 'Unrecovered orphan autoserv processes remain'
746 message = '\n'.join(str(process) for process in orphans)
747 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000748
749 die_on_orphans = global_config.global_config.get_config_value(
750 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
751
752 if die_on_orphans:
753 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000754
showard170873e2009-01-07 00:22:26 +0000755
showardd3dc1992009-04-22 21:01:40 +0000756 def _recover_running_entries(self, orphans):
757 def recover_entries(job, queue_entries, run_monitor):
758 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000759 queue_task = QueueTask(job=job, queue_entries=queue_entries,
760 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000761 self.add_agent(Agent(tasks=[queue_task],
762 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000763 else:
764 # we could do better, but this retains legacy behavior for now
765 for queue_entry in queue_entries:
766 logging.info('Requeuing running HQE %s since it has no '
767 'process' % queue_entry)
768 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000769
770 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000771 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000772 recover_entries)
773
774
775 def _recover_gathering_entries(self, orphans):
776 def recover_entries(job, queue_entries, run_monitor):
777 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000778 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000779 self.add_agent(Agent([gather_task]))
780
781 self._recover_entries_with_status(
782 models.HostQueueEntry.Status.GATHERING,
783 orphans, _CRASHINFO_PID_FILE, recover_entries)
784
785
786 def _recover_parsing_entries(self, orphans):
787 def recover_entries(job, queue_entries, run_monitor):
788 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000789 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000790 self.add_agent(Agent([reparse_task], num_processes=0))
791
792 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
793 orphans, _PARSER_PID_FILE,
794 recover_entries)
795
796
797 def _recover_all_recoverable_entries(self):
798 orphans = _drone_manager.get_orphaned_autoserv_processes()
799 self._recover_running_entries(orphans)
800 self._recover_gathering_entries(orphans)
801 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000802 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000803 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000804
showard97aed502008-11-04 02:01:24 +0000805
showarded2afea2009-07-07 20:54:07 +0000806 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000807 """\
808 Recovers all special tasks that have started running but have not
809 completed.
810 """
811
812 tasks = models.SpecialTask.objects.filter(is_active=True,
813 is_complete=False)
814 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000815 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000816 if self.host_has_agent(task.host):
817 raise SchedulerError(
818 "%s already has a host agent %s." % (
819 task, self._host_agents.get(host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000820
821 host = Host(id=task.host.id)
822 queue_entry = None
823 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000824 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000825
showarded2afea2009-07-07 20:54:07 +0000826 run_monitor, process_string = self._get_recovery_run_monitor(
827 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
828
829 logging.info('Recovering %s %s', task, process_string)
830 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000831
832
showarded2afea2009-07-07 20:54:07 +0000833 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000834 """\
835 Recovers a single special task.
836 """
837 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000838 agent_tasks = self._recover_verify(task, host, queue_entry,
839 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000840 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000841 agent_tasks = self._recover_repair(task, host, queue_entry,
842 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000843 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000844 agent_tasks = self._recover_cleanup(task, host, queue_entry,
845 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000846 else:
847 # Should never happen
848 logging.error(
849 "Special task id %d had invalid task %s", (task.id, task.task))
850
851 self.add_agent(Agent(agent_tasks))
852
853
showarded2afea2009-07-07 20:54:07 +0000854 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000855 """\
856 Recovers a verify task.
857 No associated queue entry: Verify host
858 With associated queue entry: Verify host, and run associated queue
859 entry
860 """
861 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000862 return [VerifyTask(host=host, task=task,
863 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000864 else:
showarded2afea2009-07-07 20:54:07 +0000865 return [VerifyTask(queue_entry=queue_entry, task=task,
866 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000867 SetEntryPendingTask(queue_entry=queue_entry)]
868
869
showarded2afea2009-07-07 20:54:07 +0000870 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000871 """\
872 Recovers a repair task.
873 Always repair host
874 """
showarded2afea2009-07-07 20:54:07 +0000875 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
876 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000877
878
showarded2afea2009-07-07 20:54:07 +0000879 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000880 """\
881 Recovers a cleanup task.
882 No associated queue entry: Clean host
883 With associated queue entry: Clean host, verify host if needed, and
884 run associated queue entry
885 """
886 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000887 return [CleanupTask(host=host, task=task,
888 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000889 else:
890 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000891 task=task,
892 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000893 if queue_entry.job.should_run_verify(queue_entry):
894 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
895 agent_tasks.append(
896 SetEntryPendingTask(queue_entry=queue_entry))
897 return agent_tasks
898
899
showard6af73ad2009-07-28 20:00:58 +0000900 def _requeue_starting_entries(self):
901 # temporary measure until we implement proper recovery of Starting HQEs
902 for entry in HostQueueEntry.fetch(where='status="Starting"'):
903 logging.info('Requeuing "Starting" queue entry %s' % entry)
904 assert not self.get_agents_for_entry(entry)
905 assert entry.host.status == models.Host.Status.PENDING
906 self._reverify_hosts_where('id = %s' % entry.host.id)
907 entry.requeue()
908
909
showard6878e8b2009-07-20 22:37:45 +0000910 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000911 queue_entries = HostQueueEntry.fetch(
showard70a294f2009-08-20 23:33:21 +0000912 where='active AND NOT complete AND status != "Pending"')
showardd3dc1992009-04-22 21:01:40 +0000913
showarde8e37072009-08-20 23:31:30 +0000914 unrecovered_active_hqes = [entry for entry in queue_entries
915 if not self.get_agents_for_entry(entry)]
916 if unrecovered_active_hqes:
917 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
918 raise SchedulerError(
919 '%d unrecovered active host queue entries:\n%s' %
920 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000921
922
showard1ff7b2e2009-05-15 23:17:18 +0000923 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000924 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000925 task=models.SpecialTask.Task.VERIFY, is_active=False,
926 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000927
showard2fe3f1d2009-07-06 20:19:11 +0000928 for task in tasks:
929 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
930 if host.locked or host.invalid or self.host_has_agent(host):
931 continue
showard6d7b2ff2009-06-10 00:16:47 +0000932
showard2fe3f1d2009-07-06 20:19:11 +0000933 logging.info('Force reverifying host %s', host.hostname)
934 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000935
936
showard170873e2009-01-07 00:22:26 +0000937 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000938 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000939 # should never happen
showarded2afea2009-07-07 20:54:07 +0000940 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000941 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000942 self._reverify_hosts_where(
943 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
944 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000945
946
jadmanski0afbb632008-06-06 21:10:57 +0000947 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000948 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000949 full_where='locked = 0 AND invalid = 0 AND ' + where
950 for host in Host.fetch(where=full_where):
951 if self.host_has_agent(host):
952 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000953 continue
showard170873e2009-01-07 00:22:26 +0000954 if print_message:
showardb18134f2009-03-20 20:52:18 +0000955 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000956 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000957 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000958
959
jadmanski0afbb632008-06-06 21:10:57 +0000960 def _recover_hosts(self):
961 # recover "Repair Failed" hosts
962 message = 'Reverifying dead host %s'
963 self._reverify_hosts_where("status = 'Repair Failed'",
964 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000965
966
showard04c82c52008-05-29 19:38:12 +0000967
showardb95b1bd2008-08-15 18:11:04 +0000968 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000969 # prioritize by job priority, then non-metahost over metahost, then FIFO
970 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000971 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000972 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000973 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000974
975
showard89f84db2009-03-12 20:39:13 +0000976 def _refresh_pending_queue_entries(self):
977 """
978 Lookup the pending HostQueueEntries and call our HostScheduler
979 refresh() method given that list. Return the list.
980
981 @returns A list of pending HostQueueEntries sorted in priority order.
982 """
showard63a34772008-08-18 19:32:50 +0000983 queue_entries = self._get_pending_queue_entries()
984 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000985 return []
showardb95b1bd2008-08-15 18:11:04 +0000986
showard63a34772008-08-18 19:32:50 +0000987 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000988
showard89f84db2009-03-12 20:39:13 +0000989 return queue_entries
990
991
992 def _schedule_atomic_group(self, queue_entry):
993 """
994 Schedule the given queue_entry on an atomic group of hosts.
995
996 Returns immediately if there are insufficient available hosts.
997
998 Creates new HostQueueEntries based off of queue_entry for the
999 scheduled hosts and starts them all running.
1000 """
1001 # This is a virtual host queue entry representing an entire
1002 # atomic group, find a group and schedule their hosts.
1003 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1004 queue_entry)
1005 if not group_hosts:
1006 return
showardcbe6f942009-06-17 19:33:49 +00001007
1008 logging.info('Expanding atomic group entry %s with hosts %s',
1009 queue_entry,
1010 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001011 # The first assigned host uses the original HostQueueEntry
1012 group_queue_entries = [queue_entry]
1013 for assigned_host in group_hosts[1:]:
1014 # Create a new HQE for every additional assigned_host.
1015 new_hqe = HostQueueEntry.clone(queue_entry)
1016 new_hqe.save()
1017 group_queue_entries.append(new_hqe)
1018 assert len(group_queue_entries) == len(group_hosts)
1019 for queue_entry, host in itertools.izip(group_queue_entries,
1020 group_hosts):
1021 self._run_queue_entry(queue_entry, host)
1022
1023
1024 def _schedule_new_jobs(self):
1025 queue_entries = self._refresh_pending_queue_entries()
1026 if not queue_entries:
1027 return
1028
showard63a34772008-08-18 19:32:50 +00001029 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001030 if (queue_entry.atomic_group_id is None or
1031 queue_entry.host_id is not None):
1032 assigned_host = self._host_scheduler.find_eligible_host(
1033 queue_entry)
1034 if assigned_host:
1035 self._run_queue_entry(queue_entry, assigned_host)
1036 else:
1037 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001038
1039
1040 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001041 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1042 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001043
1044
jadmanski0afbb632008-06-06 21:10:57 +00001045 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001046 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001047 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001048 for agent in self.get_agents_for_entry(entry):
1049 agent.abort()
1050 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001051
1052
showard324bf812009-01-20 23:23:38 +00001053 def _can_start_agent(self, agent, num_started_this_cycle,
1054 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001055 # always allow zero-process agents to run
1056 if agent.num_processes == 0:
1057 return True
1058 # don't allow any nonzero-process agents to run after we've reached a
1059 # limit (this avoids starvation of many-process agents)
1060 if have_reached_limit:
1061 return False
1062 # total process throttling
showard324bf812009-01-20 23:23:38 +00001063 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001064 return False
1065 # if a single agent exceeds the per-cycle throttling, still allow it to
1066 # run when it's the first agent in the cycle
1067 if num_started_this_cycle == 0:
1068 return True
1069 # per-cycle throttling
1070 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001071 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001072 return False
1073 return True
1074
1075
jadmanski0afbb632008-06-06 21:10:57 +00001076 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001077 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001078 have_reached_limit = False
1079 # iterate over copy, so we can remove agents during iteration
1080 for agent in list(self._agents):
1081 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001082 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001083 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001084 continue
1085 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001086 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001087 have_reached_limit):
1088 have_reached_limit = True
1089 continue
showard4c5374f2008-09-04 17:02:56 +00001090 num_started_this_cycle += agent.num_processes
1091 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001092 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001093 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001094
1095
showard29f7cd22009-04-29 21:16:24 +00001096 def _process_recurring_runs(self):
1097 recurring_runs = models.RecurringRun.objects.filter(
1098 start_date__lte=datetime.datetime.now())
1099 for rrun in recurring_runs:
1100 # Create job from template
1101 job = rrun.job
1102 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001103 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001104
1105 host_objects = info['hosts']
1106 one_time_hosts = info['one_time_hosts']
1107 metahost_objects = info['meta_hosts']
1108 dependencies = info['dependencies']
1109 atomic_group = info['atomic_group']
1110
1111 for host in one_time_hosts or []:
1112 this_host = models.Host.create_one_time_host(host.hostname)
1113 host_objects.append(this_host)
1114
1115 try:
1116 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001117 options=options,
showard29f7cd22009-04-29 21:16:24 +00001118 host_objects=host_objects,
1119 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001120 atomic_group=atomic_group)
1121
1122 except Exception, ex:
1123 logging.exception(ex)
1124 #TODO send email
1125
1126 if rrun.loop_count == 1:
1127 rrun.delete()
1128 else:
1129 if rrun.loop_count != 0: # if not infinite loop
1130 # calculate new start_date
1131 difference = datetime.timedelta(seconds=rrun.loop_period)
1132 rrun.start_date = rrun.start_date + difference
1133 rrun.loop_count -= 1
1134 rrun.save()
1135
1136
showard170873e2009-01-07 00:22:26 +00001137class PidfileRunMonitor(object):
1138 """
1139 Client must call either run() to start a new process or
1140 attach_to_existing_process().
1141 """
mbligh36768f02008-02-22 18:28:33 +00001142
showard170873e2009-01-07 00:22:26 +00001143 class _PidfileException(Exception):
1144 """
1145 Raised when there's some unexpected behavior with the pid file, but only
1146 used internally (never allowed to escape this class).
1147 """
mbligh36768f02008-02-22 18:28:33 +00001148
1149
showard170873e2009-01-07 00:22:26 +00001150 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001151 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001152 self._start_time = None
1153 self.pidfile_id = None
1154 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001155
1156
showard170873e2009-01-07 00:22:26 +00001157 def _add_nice_command(self, command, nice_level):
1158 if not nice_level:
1159 return command
1160 return ['nice', '-n', str(nice_level)] + command
1161
1162
1163 def _set_start_time(self):
1164 self._start_time = time.time()
1165
1166
1167 def run(self, command, working_directory, nice_level=None, log_file=None,
1168 pidfile_name=None, paired_with_pidfile=None):
1169 assert command is not None
1170 if nice_level is not None:
1171 command = ['nice', '-n', str(nice_level)] + command
1172 self._set_start_time()
1173 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001174 command, working_directory, pidfile_name=pidfile_name,
1175 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001176
1177
showarded2afea2009-07-07 20:54:07 +00001178 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001179 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001180 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001181 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001182 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001183 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def kill(self):
showard170873e2009-01-07 00:22:26 +00001187 if self.has_process():
1188 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001189
mbligh36768f02008-02-22 18:28:33 +00001190
showard170873e2009-01-07 00:22:26 +00001191 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001192 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001193 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001194
1195
showard170873e2009-01-07 00:22:26 +00001196 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001197 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001198 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001199 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001200
1201
showard170873e2009-01-07 00:22:26 +00001202 def _read_pidfile(self, use_second_read=False):
1203 assert self.pidfile_id is not None, (
1204 'You must call run() or attach_to_existing_process()')
1205 contents = _drone_manager.get_pidfile_contents(
1206 self.pidfile_id, use_second_read=use_second_read)
1207 if contents.is_invalid():
1208 self._state = drone_manager.PidfileContents()
1209 raise self._PidfileException(contents)
1210 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001211
1212
showard21baa452008-10-21 00:08:39 +00001213 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001214 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1215 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001216 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001217 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001218
1219
1220 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001221 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001222 return
mblighbb421852008-03-11 22:36:16 +00001223
showard21baa452008-10-21 00:08:39 +00001224 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001225
showard170873e2009-01-07 00:22:26 +00001226 if self._state.process is None:
1227 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001228 return
mbligh90a549d2008-03-25 23:52:34 +00001229
showard21baa452008-10-21 00:08:39 +00001230 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001231 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001232 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001233 return
mbligh90a549d2008-03-25 23:52:34 +00001234
showard170873e2009-01-07 00:22:26 +00001235 # pid but no running process - maybe process *just* exited
1236 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001237 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001238 # autoserv exited without writing an exit code
1239 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001240 self._handle_pidfile_error(
1241 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001242
showard21baa452008-10-21 00:08:39 +00001243
1244 def _get_pidfile_info(self):
1245 """\
1246 After completion, self._state will contain:
1247 pid=None, exit_status=None if autoserv has not yet run
1248 pid!=None, exit_status=None if autoserv is running
1249 pid!=None, exit_status!=None if autoserv has completed
1250 """
1251 try:
1252 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001253 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001254 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001255
1256
showard170873e2009-01-07 00:22:26 +00001257 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001258 """\
1259 Called when no pidfile is found or no pid is in the pidfile.
1260 """
showard170873e2009-01-07 00:22:26 +00001261 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001262 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1263 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001264 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001265 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001266
1267
showard35162b02009-03-03 02:17:30 +00001268 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001269 """\
1270 Called when autoserv has exited without writing an exit status,
1271 or we've timed out waiting for autoserv to write a pid to the
1272 pidfile. In either case, we just return failure and the caller
1273 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001274
showard170873e2009-01-07 00:22:26 +00001275 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001276 """
1277 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001278 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001279 self._state.exit_status = 1
1280 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001284 self._get_pidfile_info()
1285 return self._state.exit_status
1286
1287
1288 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001289 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001290 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001291 if self._state.num_tests_failed is None:
1292 return -1
showard21baa452008-10-21 00:08:39 +00001293 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001294
1295
mbligh36768f02008-02-22 18:28:33 +00001296class Agent(object):
showard77182562009-06-10 00:16:05 +00001297 """
1298 An agent for use by the Dispatcher class to perform a sequence of tasks.
1299
1300 The following methods are required on all task objects:
1301 poll() - Called periodically to let the task check its status and
1302 update its internal state. If the task succeeded.
1303 is_done() - Returns True if the task is finished.
1304 abort() - Called when an abort has been requested. The task must
1305 set its aborted attribute to True if it actually aborted.
1306
1307 The following attributes are required on all task objects:
1308 aborted - bool, True if this task was aborted.
1309 failure_tasks - A sequence of tasks to be run using a new Agent
1310 by the dispatcher should this task fail.
1311 success - bool, True if this task succeeded.
1312 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1313 host_ids - A sequence of Host ids this task represents.
1314
1315 The following attribute is written to all task objects:
1316 agent - A reference to the Agent instance that the task has been
1317 added to.
1318 """
1319
1320
showard170873e2009-01-07 00:22:26 +00001321 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001322 """
1323 @param tasks: A list of tasks as described in the class docstring.
1324 @param num_processes: The number of subprocesses the Agent represents.
1325 This is used by the Dispatcher for managing the load on the
1326 system. Defaults to 1.
1327 """
jadmanski0afbb632008-06-06 21:10:57 +00001328 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001329 self.queue = None
showard77182562009-06-10 00:16:05 +00001330 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001331 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001332 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001333
showard170873e2009-01-07 00:22:26 +00001334 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1335 for task in tasks)
1336 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1337
showardd3dc1992009-04-22 21:01:40 +00001338 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001339 for task in tasks:
1340 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001341
1342
showardd3dc1992009-04-22 21:01:40 +00001343 def _clear_queue(self):
1344 self.queue = Queue.Queue(0)
1345
1346
showard170873e2009-01-07 00:22:26 +00001347 def _union_ids(self, id_lists):
1348 return set(itertools.chain(*id_lists))
1349
1350
jadmanski0afbb632008-06-06 21:10:57 +00001351 def add_task(self, task):
1352 self.queue.put_nowait(task)
1353 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001354
1355
jadmanski0afbb632008-06-06 21:10:57 +00001356 def tick(self):
showard21baa452008-10-21 00:08:39 +00001357 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001358 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001359 self.active_task.poll()
1360 if not self.active_task.is_done():
1361 return
1362 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001363
1364
jadmanski0afbb632008-06-06 21:10:57 +00001365 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001366 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001367 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001368 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001369 if not self.active_task.success:
1370 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001371 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001372
jadmanski0afbb632008-06-06 21:10:57 +00001373 if not self.is_done():
1374 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001375
1376
jadmanski0afbb632008-06-06 21:10:57 +00001377 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001378 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001379 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1380 # get reset.
1381 new_agent = Agent(self.active_task.failure_tasks)
1382 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001383
mblighe2586682008-02-29 22:45:46 +00001384
showard4c5374f2008-09-04 17:02:56 +00001385 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001386 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001387
1388
jadmanski0afbb632008-06-06 21:10:57 +00001389 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001390 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001391
1392
showardd3dc1992009-04-22 21:01:40 +00001393 def abort(self):
showard08a36412009-05-05 01:01:13 +00001394 # abort tasks until the queue is empty or a task ignores the abort
1395 while not self.is_done():
1396 if not self.active_task:
1397 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001398 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001399 if not self.active_task.aborted:
1400 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001401 return
1402 self.active_task = None
1403
showardd3dc1992009-04-22 21:01:40 +00001404
showard77182562009-06-10 00:16:05 +00001405class DelayedCallTask(object):
1406 """
1407 A task object like AgentTask for an Agent to run that waits for the
1408 specified amount of time to have elapsed before calling the supplied
1409 callback once and finishing. If the callback returns anything, it is
1410 assumed to be a new Agent instance and will be added to the dispatcher.
1411
1412 @attribute end_time: The absolute posix time after which this task will
1413 call its callback when it is polled and be finished.
1414
1415 Also has all attributes required by the Agent class.
1416 """
1417 def __init__(self, delay_seconds, callback, now_func=None):
1418 """
1419 @param delay_seconds: The delay in seconds from now that this task
1420 will call the supplied callback and be done.
1421 @param callback: A callable to be called by this task once after at
1422 least delay_seconds time has elapsed. It must return None
1423 or a new Agent instance.
1424 @param now_func: A time.time like function. Default: time.time.
1425 Used for testing.
1426 """
1427 assert delay_seconds > 0
1428 assert callable(callback)
1429 if not now_func:
1430 now_func = time.time
1431 self._now_func = now_func
1432 self._callback = callback
1433
1434 self.end_time = self._now_func() + delay_seconds
1435
1436 # These attributes are required by Agent.
1437 self.aborted = False
1438 self.failure_tasks = ()
1439 self.host_ids = ()
1440 self.success = False
1441 self.queue_entry_ids = ()
1442 # This is filled in by Agent.add_task().
1443 self.agent = None
1444
1445
1446 def poll(self):
1447 if self._callback and self._now_func() >= self.end_time:
1448 new_agent = self._callback()
1449 if new_agent:
1450 self.agent.dispatcher.add_agent(new_agent)
1451 self._callback = None
1452 self.success = True
1453
1454
1455 def is_done(self):
1456 return not self._callback
1457
1458
1459 def abort(self):
1460 self.aborted = True
1461 self._callback = None
1462
1463
mbligh36768f02008-02-22 18:28:33 +00001464class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001465 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1466 pidfile_name=None, paired_with_pidfile=None,
1467 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001468 self.done = False
1469 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001470 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001471 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001472 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001473 self.monitor = recover_run_monitor
1474 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001475 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001476 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001477 self.queue_entry_ids = []
1478 self.host_ids = []
1479 self.log_file = None
1480
1481
1482 def _set_ids(self, host=None, queue_entries=None):
1483 if queue_entries and queue_entries != [None]:
1484 self.host_ids = [entry.host.id for entry in queue_entries]
1485 self.queue_entry_ids = [entry.id for entry in queue_entries]
1486 else:
1487 assert host
1488 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001489
1490
jadmanski0afbb632008-06-06 21:10:57 +00001491 def poll(self):
showard08a36412009-05-05 01:01:13 +00001492 if not self.started:
1493 self.start()
1494 self.tick()
1495
1496
1497 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001498 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001499 exit_code = self.monitor.exit_code()
1500 if exit_code is None:
1501 return
1502 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001503 else:
1504 success = False
mbligh36768f02008-02-22 18:28:33 +00001505
jadmanski0afbb632008-06-06 21:10:57 +00001506 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001507
1508
jadmanski0afbb632008-06-06 21:10:57 +00001509 def is_done(self):
1510 return self.done
mbligh36768f02008-02-22 18:28:33 +00001511
1512
jadmanski0afbb632008-06-06 21:10:57 +00001513 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001514 if self.done:
1515 return
jadmanski0afbb632008-06-06 21:10:57 +00001516 self.done = True
1517 self.success = success
1518 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001519
1520
jadmanski0afbb632008-06-06 21:10:57 +00001521 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001522 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001523
mbligh36768f02008-02-22 18:28:33 +00001524
jadmanski0afbb632008-06-06 21:10:57 +00001525 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001526 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001527 _drone_manager.copy_to_results_repository(
1528 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001529
1530
jadmanski0afbb632008-06-06 21:10:57 +00001531 def epilog(self):
1532 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001533
1534
jadmanski0afbb632008-06-06 21:10:57 +00001535 def start(self):
1536 assert self.agent
1537
1538 if not self.started:
1539 self.prolog()
1540 self.run()
1541
1542 self.started = True
1543
1544
1545 def abort(self):
1546 if self.monitor:
1547 self.monitor.kill()
1548 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001549 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001550 self.cleanup()
1551
1552
showarded2afea2009-07-07 20:54:07 +00001553 def _get_consistent_execution_path(self, execution_entries):
1554 first_execution_path = execution_entries[0].execution_path()
1555 for execution_entry in execution_entries[1:]:
1556 assert execution_entry.execution_path() == first_execution_path, (
1557 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1558 execution_entry,
1559 first_execution_path,
1560 execution_entries[0]))
1561 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001562
1563
showarded2afea2009-07-07 20:54:07 +00001564 def _copy_results(self, execution_entries, use_monitor=None):
1565 """
1566 @param execution_entries: list of objects with execution_path() method
1567 """
showard6d1c1432009-08-20 23:30:39 +00001568 if use_monitor is not None and not use_monitor.has_process():
1569 return
1570
showarded2afea2009-07-07 20:54:07 +00001571 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001572 if use_monitor is None:
1573 assert self.monitor
1574 use_monitor = self.monitor
1575 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001576 execution_path = self._get_consistent_execution_path(execution_entries)
1577 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001578 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001579 results_path)
showardde634ee2009-01-30 01:44:24 +00001580
showarda1e74b32009-05-12 17:32:04 +00001581
1582 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001583 reparse_task = FinalReparseTask(queue_entries)
1584 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1585
1586
showarda1e74b32009-05-12 17:32:04 +00001587 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1588 self._copy_results(queue_entries, use_monitor)
1589 self._parse_results(queue_entries)
1590
1591
showardd3dc1992009-04-22 21:01:40 +00001592 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001593 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001594 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001595 self.monitor = PidfileRunMonitor()
1596 self.monitor.run(self.cmd, self._working_directory,
1597 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001598 log_file=self.log_file,
1599 pidfile_name=pidfile_name,
1600 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001601
1602
showardd9205182009-04-27 20:09:55 +00001603class TaskWithJobKeyvals(object):
1604 """AgentTask mixin providing functionality to help with job keyval files."""
1605 _KEYVAL_FILE = 'keyval'
1606 def _format_keyval(self, key, value):
1607 return '%s=%s' % (key, value)
1608
1609
1610 def _keyval_path(self):
1611 """Subclasses must override this"""
1612 raise NotImplemented
1613
1614
1615 def _write_keyval_after_job(self, field, value):
1616 assert self.monitor
1617 if not self.monitor.has_process():
1618 return
1619 _drone_manager.write_lines_to_file(
1620 self._keyval_path(), [self._format_keyval(field, value)],
1621 paired_with_process=self.monitor.get_process())
1622
1623
1624 def _job_queued_keyval(self, job):
1625 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1626
1627
1628 def _write_job_finished(self):
1629 self._write_keyval_after_job("job_finished", int(time.time()))
1630
1631
showarded2afea2009-07-07 20:54:07 +00001632class SpecialAgentTask(AgentTask):
1633 """
1634 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1635 """
1636
1637 TASK_TYPE = None
1638 host = None
1639 queue_entry = None
1640
1641 def __init__(self, task, extra_command_args, **kwargs):
1642 assert self.host
1643 assert (self.TASK_TYPE is not None,
1644 'self.TASK_TYPE must be overridden')
1645 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001646 if task:
1647 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001648 self._extra_command_args = extra_command_args
1649 super(SpecialAgentTask, self).__init__(**kwargs)
1650
1651
1652 def prolog(self):
1653 super(SpecialAgentTask, self).prolog()
1654 self.task = models.SpecialTask.prepare(self, self.task)
1655 self.cmd = _autoserv_command_line(self.host.hostname,
1656 self._extra_command_args,
1657 queue_entry=self.queue_entry)
1658 self._working_directory = self.task.execution_path()
1659 self.task.activate()
1660
1661
showardb6681aa2009-07-08 21:15:00 +00001662 def cleanup(self):
1663 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001664
1665 # self.task can be None if a SpecialAgentTask is aborted before the
1666 # prolog runs
1667 if self.task:
1668 self.task.finish()
1669
1670 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001671 self._copy_results([self.task])
1672
1673
1674class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1675 TASK_TYPE = models.SpecialTask.Task.REPAIR
1676
1677
1678 def __init__(self, host, queue_entry=None, task=None,
1679 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001680 """\
showard170873e2009-01-07 00:22:26 +00001681 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001682 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001683 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001684 # normalize the protection name
1685 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001686
jadmanski0afbb632008-06-06 21:10:57 +00001687 self.host = host
showard58721a82009-08-20 23:32:40 +00001688 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001689
showarded2afea2009-07-07 20:54:07 +00001690 super(RepairTask, self).__init__(
1691 task, ['-R', '--host-protection', protection],
1692 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001693
showard2fe3f1d2009-07-06 20:19:11 +00001694 # *don't* include the queue entry in IDs -- if the queue entry is
1695 # aborted, we want to leave the repair task running
1696 self._set_ids(host=host)
1697
mbligh36768f02008-02-22 18:28:33 +00001698
jadmanski0afbb632008-06-06 21:10:57 +00001699 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001700 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001701 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001702 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001703
mbligh36768f02008-02-22 18:28:33 +00001704
showardd9205182009-04-27 20:09:55 +00001705 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001706 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001707
1708
showardde634ee2009-01-30 01:44:24 +00001709 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001710 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001711
showard2fe3f1d2009-07-06 20:19:11 +00001712 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001713 return # don't fail metahost entries, they'll be reassigned
1714
showard2fe3f1d2009-07-06 20:19:11 +00001715 self.queue_entry.update_from_database()
1716 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001717 return # entry has been aborted
1718
showard2fe3f1d2009-07-06 20:19:11 +00001719 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001720 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001721 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001722 self._write_keyval_after_job(queued_key, queued_time)
1723 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001724 # copy results logs into the normal place for job results
1725 _drone_manager.copy_results_on_drone(
1726 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001727 source_path=self._working_directory + '/',
1728 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001729
showard2fe3f1d2009-07-06 20:19:11 +00001730 self._copy_results([self.queue_entry])
1731 if self.queue_entry.job.parse_failed_repair:
1732 self._parse_results([self.queue_entry])
1733 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001734
1735
jadmanski0afbb632008-06-06 21:10:57 +00001736 def epilog(self):
1737 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001738
jadmanski0afbb632008-06-06 21:10:57 +00001739 if self.success:
1740 self.host.set_status('Ready')
1741 else:
1742 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001743 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001744 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001745
1746
showarded2afea2009-07-07 20:54:07 +00001747class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001748 def epilog(self):
1749 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001750 should_copy_results = (self.queue_entry and not self.success
1751 and not self.queue_entry.meta_host)
1752 if should_copy_results:
1753 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001754 log_name = os.path.basename(self.task.execution_path())
1755 source = os.path.join(self.task.execution_path(), 'debug',
1756 'autoserv.DEBUG')
1757 destination = os.path.join(self.queue_entry.execution_path(),
1758 log_name)
showard170873e2009-01-07 00:22:26 +00001759 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001760 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001761 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001762
showard58721a82009-08-20 23:32:40 +00001763 if not self.success and self.queue_entry:
1764 self.queue_entry.requeue()
1765
showard8fe93b52008-11-18 17:53:22 +00001766
1767class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001768 TASK_TYPE = models.SpecialTask.Task.VERIFY
1769
1770
1771 def __init__(self, queue_entry=None, host=None, task=None,
1772 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001773 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001774 self.host = host or queue_entry.host
1775 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001776
showarde788ea62008-11-17 21:02:47 +00001777 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001778 super(VerifyTask, self).__init__(
1779 task, ['-v'], failure_tasks=failure_tasks,
1780 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001781
showard170873e2009-01-07 00:22:26 +00001782 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001783
1784
jadmanski0afbb632008-06-06 21:10:57 +00001785 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001786 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001787
showardb18134f2009-03-20 20:52:18 +00001788 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001789 if self.queue_entry:
1790 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001791 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001792
showarded2afea2009-07-07 20:54:07 +00001793 # Delete any other queued verifies for this host. One verify will do
1794 # and there's no need to keep records of other requests.
1795 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001796 host__id=self.host.id,
1797 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001798 is_active=False, is_complete=False)
1799 queued_verifies = queued_verifies.exclude(id=self.task.id)
1800 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001801
mbligh36768f02008-02-22 18:28:33 +00001802
jadmanski0afbb632008-06-06 21:10:57 +00001803 def epilog(self):
1804 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001805 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001806 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001807
1808
showardb5626452009-06-30 01:57:28 +00001809class CleanupHostsMixin(object):
1810 def _reboot_hosts(self, job, queue_entries, final_success,
1811 num_tests_failed):
1812 reboot_after = job.reboot_after
1813 do_reboot = (
1814 # always reboot after aborted jobs
1815 self._final_status == models.HostQueueEntry.Status.ABORTED
1816 or reboot_after == models.RebootAfter.ALWAYS
1817 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1818 and final_success and num_tests_failed == 0))
1819
1820 for queue_entry in queue_entries:
1821 if do_reboot:
1822 # don't pass the queue entry to the CleanupTask. if the cleanup
1823 # fails, the job doesn't care -- it's over.
1824 cleanup_task = CleanupTask(host=queue_entry.host)
1825 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1826 else:
1827 queue_entry.host.set_status('Ready')
1828
1829
1830class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001831 def __init__(self, job, queue_entries, cmd=None, group_name='',
1832 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001833 self.job = job
1834 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001835 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001836 super(QueueTask, self).__init__(
1837 cmd, self._execution_path(),
1838 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001839 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001840
1841
showard73ec0442009-02-07 02:05:20 +00001842 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001843 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001844
1845
1846 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1847 keyval_contents = '\n'.join(self._format_keyval(key, value)
1848 for key, value in keyval_dict.iteritems())
1849 # always end with a newline to allow additional keyvals to be written
1850 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001851 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001852 keyval_contents,
1853 file_path=keyval_path)
1854
1855
1856 def _write_keyvals_before_job(self, keyval_dict):
1857 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1858
1859
showard170873e2009-01-07 00:22:26 +00001860 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001861 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001862 host.hostname)
1863 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001864 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1865 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001866
1867
showarded2afea2009-07-07 20:54:07 +00001868 def _execution_path(self):
1869 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001870
1871
jadmanski0afbb632008-06-06 21:10:57 +00001872 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001873 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001874 keyval_dict = {queued_key: queued_time}
1875 if self.group_name:
1876 keyval_dict['host_group_name'] = self.group_name
1877 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001878 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001879 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001880 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001881 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001882 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001883 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001884 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1885 # TODO(gps): Remove this if nothing needs it anymore.
1886 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001887 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001888
1889
showard35162b02009-03-03 02:17:30 +00001890 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001891 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001892 _drone_manager.write_lines_to_file(error_file_path,
1893 [_LOST_PROCESS_ERROR])
1894
1895
showardd3dc1992009-04-22 21:01:40 +00001896 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001897 if not self.monitor:
1898 return
1899
showardd9205182009-04-27 20:09:55 +00001900 self._write_job_finished()
1901
showard6d1c1432009-08-20 23:30:39 +00001902 gather_task = GatherLogsTask(self.job, self.queue_entries)
1903 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001904
1905 if self.monitor.lost_process:
1906 self._write_lost_process_error_file()
1907 for queue_entry in self.queue_entries:
1908 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001909
1910
showardcbd74612008-11-19 21:42:02 +00001911 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001912 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001913 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001914 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001915 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001916
1917
jadmanskif7fa2cc2008-10-01 14:13:23 +00001918 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001919 if not self.monitor or not self.monitor.has_process():
1920 return
1921
jadmanskif7fa2cc2008-10-01 14:13:23 +00001922 # build up sets of all the aborted_by and aborted_on values
1923 aborted_by, aborted_on = set(), set()
1924 for queue_entry in self.queue_entries:
1925 if queue_entry.aborted_by:
1926 aborted_by.add(queue_entry.aborted_by)
1927 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1928 aborted_on.add(t)
1929
1930 # extract some actual, unique aborted by value and write it out
1931 assert len(aborted_by) <= 1
1932 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001933 aborted_by_value = aborted_by.pop()
1934 aborted_on_value = max(aborted_on)
1935 else:
1936 aborted_by_value = 'autotest_system'
1937 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001938
showarda0382352009-02-11 23:36:43 +00001939 self._write_keyval_after_job("aborted_by", aborted_by_value)
1940 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001941
showardcbd74612008-11-19 21:42:02 +00001942 aborted_on_string = str(datetime.datetime.fromtimestamp(
1943 aborted_on_value))
1944 self._write_status_comment('Job aborted by %s on %s' %
1945 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001946
1947
jadmanski0afbb632008-06-06 21:10:57 +00001948 def abort(self):
1949 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001950 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001951 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001952
1953
jadmanski0afbb632008-06-06 21:10:57 +00001954 def epilog(self):
1955 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001956 self._finish_task()
1957 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001958
1959
showardd3dc1992009-04-22 21:01:40 +00001960class PostJobTask(AgentTask):
1961 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001962 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001963 self._queue_entries = queue_entries
1964 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001965
showarded2afea2009-07-07 20:54:07 +00001966 self._execution_path = self._get_consistent_execution_path(
1967 queue_entries)
1968 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001969 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001970 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001971 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1972
1973 if _testing_mode:
1974 command = 'true'
1975 else:
1976 command = self._generate_command(self._results_dir)
1977
showarded2afea2009-07-07 20:54:07 +00001978 super(PostJobTask, self).__init__(
1979 cmd=command, working_directory=self._execution_path,
1980 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001981
showarded2afea2009-07-07 20:54:07 +00001982 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001983 self._final_status = self._determine_final_status()
1984
1985
1986 def _generate_command(self, results_dir):
1987 raise NotImplementedError('Subclasses must override this')
1988
1989
1990 def _job_was_aborted(self):
1991 was_aborted = None
1992 for queue_entry in self._queue_entries:
1993 queue_entry.update_from_database()
1994 if was_aborted is None: # first queue entry
1995 was_aborted = bool(queue_entry.aborted)
1996 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1997 email_manager.manager.enqueue_notify_email(
1998 'Inconsistent abort state',
1999 'Queue entries have inconsistent abort state: ' +
2000 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2001 # don't crash here, just assume true
2002 return True
2003 return was_aborted
2004
2005
2006 def _determine_final_status(self):
2007 if self._job_was_aborted():
2008 return models.HostQueueEntry.Status.ABORTED
2009
2010 # we'll use a PidfileRunMonitor to read the autoserv exit status
2011 if self._autoserv_monitor.exit_code() == 0:
2012 return models.HostQueueEntry.Status.COMPLETED
2013 return models.HostQueueEntry.Status.FAILED
2014
2015
2016 def run(self):
showard5add1c82009-05-26 19:27:46 +00002017 # make sure we actually have results to work with.
2018 # this should never happen in normal operation.
2019 if not self._autoserv_monitor.has_process():
2020 email_manager.manager.enqueue_notify_email(
2021 'No results in post-job task',
2022 'No results in post-job task at %s' %
2023 self._autoserv_monitor.pidfile_id)
2024 self.finished(False)
2025 return
2026
2027 super(PostJobTask, self).run(
2028 pidfile_name=self._pidfile_name,
2029 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002030
2031
2032 def _set_all_statuses(self, status):
2033 for queue_entry in self._queue_entries:
2034 queue_entry.set_status(status)
2035
2036
2037 def abort(self):
2038 # override AgentTask.abort() to avoid killing the process and ending
2039 # the task. post-job tasks continue when the job is aborted.
2040 pass
2041
2042
showardb5626452009-06-30 01:57:28 +00002043class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002044 """
2045 Task responsible for
2046 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2047 * copying logs to the results repository
2048 * spawning CleanupTasks for hosts, if necessary
2049 * spawning a FinalReparseTask for the job
2050 """
showarded2afea2009-07-07 20:54:07 +00002051 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002052 self._job = job
2053 super(GatherLogsTask, self).__init__(
2054 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002055 logfile_name='.collect_crashinfo.log',
2056 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002057 self._set_ids(queue_entries=queue_entries)
2058
2059
2060 def _generate_command(self, results_dir):
2061 host_list = ','.join(queue_entry.host.hostname
2062 for queue_entry in self._queue_entries)
2063 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2064 '-r', results_dir]
2065
2066
2067 def prolog(self):
2068 super(GatherLogsTask, self).prolog()
2069 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2070
2071
showardd3dc1992009-04-22 21:01:40 +00002072 def epilog(self):
2073 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002074
showard6d1c1432009-08-20 23:30:39 +00002075 self._copy_and_parse_results(self._queue_entries,
2076 use_monitor=self._autoserv_monitor)
2077
2078 if self._autoserv_monitor.has_process():
2079 final_success = (self._final_status ==
2080 models.HostQueueEntry.Status.COMPLETED)
2081 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2082 else:
2083 final_success = False
2084 num_tests_failed = 0
2085
showardb5626452009-06-30 01:57:28 +00002086 self._reboot_hosts(self._job, self._queue_entries, final_success,
2087 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002088
2089
showard0bbfc212009-04-29 21:06:13 +00002090 def run(self):
showard597bfd32009-05-08 18:22:50 +00002091 autoserv_exit_code = self._autoserv_monitor.exit_code()
2092 # only run if Autoserv exited due to some signal. if we have no exit
2093 # code, assume something bad (and signal-like) happened.
2094 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002095 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002096 else:
2097 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002098
2099
showard8fe93b52008-11-18 17:53:22 +00002100class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002101 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2102
2103
2104 def __init__(self, host=None, queue_entry=None, task=None,
2105 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002106 assert bool(host) ^ bool(queue_entry)
2107 if queue_entry:
2108 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002109 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002110 self.host = host
showard170873e2009-01-07 00:22:26 +00002111
showarde788ea62008-11-17 21:02:47 +00002112 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002113 super(CleanupTask, self).__init__(
2114 task, ['--cleanup'], failure_tasks=[repair_task],
2115 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002116
2117 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002118
mblighd5c95802008-03-05 00:33:46 +00002119
jadmanski0afbb632008-06-06 21:10:57 +00002120 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002121 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002122 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002123 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002124
mblighd5c95802008-03-05 00:33:46 +00002125
showard21baa452008-10-21 00:08:39 +00002126 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002127 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002128
showard21baa452008-10-21 00:08:39 +00002129 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002130 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002131 self.host.update_field('dirty', 0)
2132
2133
showardd3dc1992009-04-22 21:01:40 +00002134class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002135 _num_running_parses = 0
2136
showarded2afea2009-07-07 20:54:07 +00002137 def __init__(self, queue_entries, recover_run_monitor=None):
2138 super(FinalReparseTask, self).__init__(
2139 queue_entries, pidfile_name=_PARSER_PID_FILE,
2140 logfile_name='.parse.log',
2141 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002142 # don't use _set_ids, since we don't want to set the host_ids
2143 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002144 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002145
showard97aed502008-11-04 02:01:24 +00002146
2147 @classmethod
2148 def _increment_running_parses(cls):
2149 cls._num_running_parses += 1
2150
2151
2152 @classmethod
2153 def _decrement_running_parses(cls):
2154 cls._num_running_parses -= 1
2155
2156
2157 @classmethod
2158 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002159 return (cls._num_running_parses <
2160 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002161
2162
2163 def prolog(self):
2164 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002165 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002166
2167
2168 def epilog(self):
2169 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002170 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002171
2172
showardd3dc1992009-04-22 21:01:40 +00002173 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002174 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002175 results_dir]
showard97aed502008-11-04 02:01:24 +00002176
2177
showard08a36412009-05-05 01:01:13 +00002178 def tick(self):
2179 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002180 # and we can, at which point we revert to default behavior
2181 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002182 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002183 else:
2184 self._try_starting_parse()
2185
2186
2187 def run(self):
2188 # override run() to not actually run unless we can
2189 self._try_starting_parse()
2190
2191
2192 def _try_starting_parse(self):
2193 if not self._can_run_new_parse():
2194 return
showard170873e2009-01-07 00:22:26 +00002195
showard97aed502008-11-04 02:01:24 +00002196 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002197 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002198
showard97aed502008-11-04 02:01:24 +00002199 self._increment_running_parses()
2200 self._parse_started = True
2201
2202
2203 def finished(self, success):
2204 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002205 if self._parse_started:
2206 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002207
2208
showardc9ae1782009-01-30 01:42:37 +00002209class SetEntryPendingTask(AgentTask):
2210 def __init__(self, queue_entry):
2211 super(SetEntryPendingTask, self).__init__(cmd='')
2212 self._queue_entry = queue_entry
2213 self._set_ids(queue_entries=[queue_entry])
2214
2215
2216 def run(self):
2217 agent = self._queue_entry.on_pending()
2218 if agent:
2219 self.agent.dispatcher.add_agent(agent)
2220 self.finished(True)
2221
2222
showarda3c58572009-03-12 20:36:59 +00002223class DBError(Exception):
2224 """Raised by the DBObject constructor when its select fails."""
2225
2226
mbligh36768f02008-02-22 18:28:33 +00002227class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002228 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002229
2230 # Subclasses MUST override these:
2231 _table_name = ''
2232 _fields = ()
2233
showarda3c58572009-03-12 20:36:59 +00002234 # A mapping from (type, id) to the instance of the object for that
2235 # particular id. This prevents us from creating new Job() and Host()
2236 # instances for every HostQueueEntry object that we instantiate as
2237 # multiple HQEs often share the same Job.
2238 _instances_by_type_and_id = weakref.WeakValueDictionary()
2239 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002240
showarda3c58572009-03-12 20:36:59 +00002241
2242 def __new__(cls, id=None, **kwargs):
2243 """
2244 Look to see if we already have an instance for this particular type
2245 and id. If so, use it instead of creating a duplicate instance.
2246 """
2247 if id is not None:
2248 instance = cls._instances_by_type_and_id.get((cls, id))
2249 if instance:
2250 return instance
2251 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2252
2253
2254 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002255 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002256 assert self._table_name, '_table_name must be defined in your class'
2257 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002258 if not new_record:
2259 if self._initialized and not always_query:
2260 return # We've already been initialized.
2261 if id is None:
2262 id = row[0]
2263 # Tell future constructors to use us instead of re-querying while
2264 # this instance is still around.
2265 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002266
showard6ae5ea92009-02-25 00:11:51 +00002267 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002268
jadmanski0afbb632008-06-06 21:10:57 +00002269 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002270
jadmanski0afbb632008-06-06 21:10:57 +00002271 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002272 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002273
showarda3c58572009-03-12 20:36:59 +00002274 if self._initialized:
2275 differences = self._compare_fields_in_row(row)
2276 if differences:
showard7629f142009-03-27 21:02:02 +00002277 logging.warn(
2278 'initialized %s %s instance requery is updating: %s',
2279 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002280 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002281 self._initialized = True
2282
2283
2284 @classmethod
2285 def _clear_instance_cache(cls):
2286 """Used for testing, clear the internal instance cache."""
2287 cls._instances_by_type_and_id.clear()
2288
2289
showardccbd6c52009-03-21 00:10:21 +00002290 def _fetch_row_from_db(self, row_id):
2291 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2292 rows = _db.execute(sql, (row_id,))
2293 if not rows:
showard76e29d12009-04-15 21:53:10 +00002294 raise DBError("row not found (table=%s, row id=%s)"
2295 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002296 return rows[0]
2297
2298
showarda3c58572009-03-12 20:36:59 +00002299 def _assert_row_length(self, row):
2300 assert len(row) == len(self._fields), (
2301 "table = %s, row = %s/%d, fields = %s/%d" % (
2302 self.__table, row, len(row), self._fields, len(self._fields)))
2303
2304
2305 def _compare_fields_in_row(self, row):
2306 """
2307 Given a row as returned by a SELECT query, compare it to our existing
2308 in memory fields.
2309
2310 @param row - A sequence of values corresponding to fields named in
2311 The class attribute _fields.
2312
2313 @returns A dictionary listing the differences keyed by field name
2314 containing tuples of (current_value, row_value).
2315 """
2316 self._assert_row_length(row)
2317 differences = {}
2318 for field, row_value in itertools.izip(self._fields, row):
2319 current_value = getattr(self, field)
2320 if current_value != row_value:
2321 differences[field] = (current_value, row_value)
2322 return differences
showard2bab8f42008-11-12 18:15:22 +00002323
2324
2325 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002326 """
2327 Update our field attributes using a single row returned by SELECT.
2328
2329 @param row - A sequence of values corresponding to fields named in
2330 the class fields list.
2331 """
2332 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002333
showard2bab8f42008-11-12 18:15:22 +00002334 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002335 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002336 setattr(self, field, value)
2337 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002338
showard2bab8f42008-11-12 18:15:22 +00002339 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002340
mblighe2586682008-02-29 22:45:46 +00002341
showardccbd6c52009-03-21 00:10:21 +00002342 def update_from_database(self):
2343 assert self.id is not None
2344 row = self._fetch_row_from_db(self.id)
2345 self._update_fields_from_row(row)
2346
2347
jadmanski0afbb632008-06-06 21:10:57 +00002348 def count(self, where, table = None):
2349 if not table:
2350 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002351
jadmanski0afbb632008-06-06 21:10:57 +00002352 rows = _db.execute("""
2353 SELECT count(*) FROM %s
2354 WHERE %s
2355 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002356
jadmanski0afbb632008-06-06 21:10:57 +00002357 assert len(rows) == 1
2358
2359 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002360
2361
showardd3dc1992009-04-22 21:01:40 +00002362 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002363 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002364
showard2bab8f42008-11-12 18:15:22 +00002365 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002366 return
mbligh36768f02008-02-22 18:28:33 +00002367
mblighf8c624d2008-07-03 16:58:45 +00002368 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002369 _db.execute(query, (value, self.id))
2370
showard2bab8f42008-11-12 18:15:22 +00002371 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002372
2373
jadmanski0afbb632008-06-06 21:10:57 +00002374 def save(self):
2375 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002376 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002377 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002378 values = []
2379 for key in keys:
2380 value = getattr(self, key)
2381 if value is None:
2382 values.append('NULL')
2383 else:
2384 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002385 values_str = ','.join(values)
2386 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2387 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002388 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002389 # Update our id to the one the database just assigned to us.
2390 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002391
2392
jadmanski0afbb632008-06-06 21:10:57 +00002393 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002394 self._instances_by_type_and_id.pop((type(self), id), None)
2395 self._initialized = False
2396 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002397 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2398 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002399
2400
showard63a34772008-08-18 19:32:50 +00002401 @staticmethod
2402 def _prefix_with(string, prefix):
2403 if string:
2404 string = prefix + string
2405 return string
2406
2407
jadmanski0afbb632008-06-06 21:10:57 +00002408 @classmethod
showard989f25d2008-10-01 11:38:11 +00002409 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002410 """
2411 Construct instances of our class based on the given database query.
2412
2413 @yields One class instance for each row fetched.
2414 """
showard63a34772008-08-18 19:32:50 +00002415 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2416 where = cls._prefix_with(where, 'WHERE ')
2417 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002418 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002419 'joins' : joins,
2420 'where' : where,
2421 'order_by' : order_by})
2422 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002423 for row in rows:
2424 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002425
mbligh36768f02008-02-22 18:28:33 +00002426
2427class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002428 _table_name = 'ineligible_host_queues'
2429 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002430
2431
showard89f84db2009-03-12 20:39:13 +00002432class AtomicGroup(DBObject):
2433 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002434 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2435 'invalid')
showard89f84db2009-03-12 20:39:13 +00002436
2437
showard989f25d2008-10-01 11:38:11 +00002438class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002439 _table_name = 'labels'
2440 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002441 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002442
2443
showard6157c632009-07-06 20:19:31 +00002444 def __repr__(self):
2445 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2446 self.name, self.id, self.atomic_group_id)
2447
2448
mbligh36768f02008-02-22 18:28:33 +00002449class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002450 _table_name = 'hosts'
2451 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2452 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2453
2454
jadmanski0afbb632008-06-06 21:10:57 +00002455 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002456 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002457 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002458
2459
showard170873e2009-01-07 00:22:26 +00002460 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002461 """
showard170873e2009-01-07 00:22:26 +00002462 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002463 """
2464 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002465 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002466 FROM labels
2467 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002468 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002469 ORDER BY labels.name
2470 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002471 platform = None
2472 all_labels = []
2473 for label_name, is_platform in rows:
2474 if is_platform:
2475 platform = label_name
2476 all_labels.append(label_name)
2477 return platform, all_labels
2478
2479
showard2fe3f1d2009-07-06 20:19:11 +00002480 def reverify_tasks(self):
2481 cleanup_task = CleanupTask(host=self)
2482 verify_task = VerifyTask(host=self)
2483
showard6d7b2ff2009-06-10 00:16:47 +00002484 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002485 self.set_status('Cleaning')
2486 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002487
2488
showard54c1ea92009-05-20 00:32:58 +00002489 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2490
2491
2492 @classmethod
2493 def cmp_for_sort(cls, a, b):
2494 """
2495 A comparison function for sorting Host objects by hostname.
2496
2497 This strips any trailing numeric digits, ignores leading 0s and
2498 compares hostnames by the leading name and the trailing digits as a
2499 number. If both hostnames do not match this pattern, they are simply
2500 compared as lower case strings.
2501
2502 Example of how hostnames will be sorted:
2503
2504 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2505
2506 This hopefully satisfy most people's hostname sorting needs regardless
2507 of their exact naming schemes. Nobody sane should have both a host10
2508 and host010 (but the algorithm works regardless).
2509 """
2510 lower_a = a.hostname.lower()
2511 lower_b = b.hostname.lower()
2512 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2513 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2514 if match_a and match_b:
2515 name_a, number_a_str = match_a.groups()
2516 name_b, number_b_str = match_b.groups()
2517 number_a = int(number_a_str.lstrip('0'))
2518 number_b = int(number_b_str.lstrip('0'))
2519 result = cmp((name_a, number_a), (name_b, number_b))
2520 if result == 0 and lower_a != lower_b:
2521 # If they compared equal above but the lower case names are
2522 # indeed different, don't report equality. abc012 != abc12.
2523 return cmp(lower_a, lower_b)
2524 return result
2525 else:
2526 return cmp(lower_a, lower_b)
2527
2528
mbligh36768f02008-02-22 18:28:33 +00002529class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002530 _table_name = 'host_queue_entries'
2531 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002532 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002533 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002534
2535
showarda3c58572009-03-12 20:36:59 +00002536 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002537 assert id or row
showarda3c58572009-03-12 20:36:59 +00002538 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002539 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002540
jadmanski0afbb632008-06-06 21:10:57 +00002541 if self.host_id:
2542 self.host = Host(self.host_id)
2543 else:
2544 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002545
showard77182562009-06-10 00:16:05 +00002546 if self.atomic_group_id:
2547 self.atomic_group = AtomicGroup(self.atomic_group_id,
2548 always_query=False)
2549 else:
2550 self.atomic_group = None
2551
showard170873e2009-01-07 00:22:26 +00002552 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002553 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002554
2555
showard89f84db2009-03-12 20:39:13 +00002556 @classmethod
2557 def clone(cls, template):
2558 """
2559 Creates a new row using the values from a template instance.
2560
2561 The new instance will not exist in the database or have a valid
2562 id attribute until its save() method is called.
2563 """
2564 assert isinstance(template, cls)
2565 new_row = [getattr(template, field) for field in cls._fields]
2566 clone = cls(row=new_row, new_record=True)
2567 clone.id = None
2568 return clone
2569
2570
showardc85c21b2008-11-24 22:17:37 +00002571 def _view_job_url(self):
2572 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2573
2574
showardf1ae3542009-05-11 19:26:02 +00002575 def get_labels(self):
2576 """
2577 Get all labels associated with this host queue entry (either via the
2578 meta_host or as a job dependency label). The labels yielded are not
2579 guaranteed to be unique.
2580
2581 @yields Label instances associated with this host_queue_entry.
2582 """
2583 if self.meta_host:
2584 yield Label(id=self.meta_host, always_query=False)
2585 labels = Label.fetch(
2586 joins="JOIN jobs_dependency_labels AS deps "
2587 "ON (labels.id = deps.label_id)",
2588 where="deps.job_id = %d" % self.job.id)
2589 for label in labels:
2590 yield label
2591
2592
jadmanski0afbb632008-06-06 21:10:57 +00002593 def set_host(self, host):
2594 if host:
2595 self.queue_log_record('Assigning host ' + host.hostname)
2596 self.update_field('host_id', host.id)
2597 self.update_field('active', True)
2598 self.block_host(host.id)
2599 else:
2600 self.queue_log_record('Releasing host')
2601 self.unblock_host(self.host.id)
2602 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002603
jadmanski0afbb632008-06-06 21:10:57 +00002604 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002605
2606
jadmanski0afbb632008-06-06 21:10:57 +00002607 def get_host(self):
2608 return self.host
mbligh36768f02008-02-22 18:28:33 +00002609
2610
jadmanski0afbb632008-06-06 21:10:57 +00002611 def queue_log_record(self, log_line):
2612 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002613 _drone_manager.write_lines_to_file(self.queue_log_path,
2614 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002615
2616
jadmanski0afbb632008-06-06 21:10:57 +00002617 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002618 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002619 row = [0, self.job.id, host_id]
2620 block = IneligibleHostQueue(row=row, new_record=True)
2621 block.save()
mblighe2586682008-02-29 22:45:46 +00002622
2623
jadmanski0afbb632008-06-06 21:10:57 +00002624 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002625 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002626 blocks = IneligibleHostQueue.fetch(
2627 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2628 for block in blocks:
2629 block.delete()
mblighe2586682008-02-29 22:45:46 +00002630
2631
showard2bab8f42008-11-12 18:15:22 +00002632 def set_execution_subdir(self, subdir=None):
2633 if subdir is None:
2634 assert self.get_host()
2635 subdir = self.get_host().hostname
2636 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002637
2638
showard6355f6b2008-12-05 18:52:13 +00002639 def _get_hostname(self):
2640 if self.host:
2641 return self.host.hostname
2642 return 'no host'
2643
2644
showard170873e2009-01-07 00:22:26 +00002645 def __str__(self):
2646 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2647
2648
jadmanski0afbb632008-06-06 21:10:57 +00002649 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002650 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002651
showardb18134f2009-03-20 20:52:18 +00002652 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002653
showardc85c21b2008-11-24 22:17:37 +00002654 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002655 self.update_field('complete', False)
2656 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002657
jadmanski0afbb632008-06-06 21:10:57 +00002658 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002659 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002660 self.update_field('complete', False)
2661 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002662
showardc85c21b2008-11-24 22:17:37 +00002663 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002664 self.update_field('complete', True)
2665 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002666
2667 should_email_status = (status.lower() in _notify_email_statuses or
2668 'all' in _notify_email_statuses)
2669 if should_email_status:
2670 self._email_on_status(status)
2671
2672 self._email_on_job_complete()
2673
2674
2675 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002676 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002677
2678 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2679 self.job.id, self.job.name, hostname, status)
2680 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2681 self.job.id, self.job.name, hostname, status,
2682 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002683 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002684
2685
2686 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002687 if not self.job.is_finished():
2688 return
showard542e8402008-09-19 20:16:18 +00002689
showardc85c21b2008-11-24 22:17:37 +00002690 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002691 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002692 for queue_entry in hosts_queue:
2693 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002694 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002695 queue_entry.status))
2696
2697 summary_text = "\n".join(summary_text)
2698 status_counts = models.Job.objects.get_status_counts(
2699 [self.job.id])[self.job.id]
2700 status = ', '.join('%d %s' % (count, status) for status, count
2701 in status_counts.iteritems())
2702
2703 subject = 'Autotest: Job ID: %s "%s" %s' % (
2704 self.job.id, self.job.name, status)
2705 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2706 self.job.id, self.job.name, status, self._view_job_url(),
2707 summary_text)
showard170873e2009-01-07 00:22:26 +00002708 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002709
2710
showard77182562009-06-10 00:16:05 +00002711 def run_pre_job_tasks(self, assigned_host=None):
2712 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002713 assert assigned_host
2714 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002715 if self.host_id is None:
2716 self.set_host(assigned_host)
2717 else:
2718 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002719
showardcfd4a7e2009-07-11 01:47:33 +00002720 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002721 self.job.name, self.meta_host, self.atomic_group_id,
2722 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002723
showard77182562009-06-10 00:16:05 +00002724 return self._do_run_pre_job_tasks()
2725
2726
2727 def _do_run_pre_job_tasks(self):
2728 # Every host goes thru the Verifying stage (which may or may not
2729 # actually do anything as determined by get_pre_job_tasks).
2730 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2731
2732 # The pre job tasks always end with a SetEntryPendingTask which
2733 # will continue as appropriate through queue_entry.on_pending().
2734 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002735
showard6ae5ea92009-02-25 00:11:51 +00002736
jadmanski0afbb632008-06-06 21:10:57 +00002737 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002738 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002739 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002740 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002741 # verify/cleanup failure sets the execution subdir, so reset it here
2742 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002743 if self.meta_host:
2744 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002745
2746
jadmanski0afbb632008-06-06 21:10:57 +00002747 def handle_host_failure(self):
2748 """\
2749 Called when this queue entry's host has failed verification and
2750 repair.
2751 """
2752 assert not self.meta_host
2753 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002754 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002755
2756
jadmanskif7fa2cc2008-10-01 14:13:23 +00002757 @property
2758 def aborted_by(self):
2759 self._load_abort_info()
2760 return self._aborted_by
2761
2762
2763 @property
2764 def aborted_on(self):
2765 self._load_abort_info()
2766 return self._aborted_on
2767
2768
2769 def _load_abort_info(self):
2770 """ Fetch info about who aborted the job. """
2771 if hasattr(self, "_aborted_by"):
2772 return
2773 rows = _db.execute("""
2774 SELECT users.login, aborted_host_queue_entries.aborted_on
2775 FROM aborted_host_queue_entries
2776 INNER JOIN users
2777 ON users.id = aborted_host_queue_entries.aborted_by_id
2778 WHERE aborted_host_queue_entries.queue_entry_id = %s
2779 """, (self.id,))
2780 if rows:
2781 self._aborted_by, self._aborted_on = rows[0]
2782 else:
2783 self._aborted_by = self._aborted_on = None
2784
2785
showardb2e2c322008-10-14 17:33:55 +00002786 def on_pending(self):
2787 """
2788 Called when an entry in a synchronous job has passed verify. If the
2789 job is ready to run, returns an agent to run the job. Returns None
2790 otherwise.
2791 """
2792 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002793 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002794
2795 # Some debug code here: sends an email if an asynchronous job does not
2796 # immediately enter Starting.
2797 # TODO: Remove this once we figure out why asynchronous jobs are getting
2798 # stuck in Pending.
2799 agent = self.job.run_if_ready(queue_entry=self)
2800 if self.job.synch_count == 1 and agent is None:
2801 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2802 message = 'Asynchronous job stuck in Pending'
2803 email_manager.manager.enqueue_notify_email(subject, message)
2804 return agent
showardb2e2c322008-10-14 17:33:55 +00002805
2806
showardd3dc1992009-04-22 21:01:40 +00002807 def abort(self, dispatcher):
2808 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002809
showardd3dc1992009-04-22 21:01:40 +00002810 Status = models.HostQueueEntry.Status
2811 has_running_job_agent = (
2812 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2813 and dispatcher.get_agents_for_entry(self))
2814 if has_running_job_agent:
2815 # do nothing; post-job tasks will finish and then mark this entry
2816 # with status "Aborted" and take care of the host
2817 return
2818
2819 if self.status in (Status.STARTING, Status.PENDING):
2820 self.host.set_status(models.Host.Status.READY)
2821 elif self.status == Status.VERIFYING:
2822 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2823
2824 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002825
2826 def execution_tag(self):
2827 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002828 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002829
2830
showarded2afea2009-07-07 20:54:07 +00002831 def execution_path(self):
2832 return self.execution_tag()
2833
2834
mbligh36768f02008-02-22 18:28:33 +00002835class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002836 _table_name = 'jobs'
2837 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2838 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002839 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002840 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002841
showard77182562009-06-10 00:16:05 +00002842 # This does not need to be a column in the DB. The delays are likely to
2843 # be configured short. If the scheduler is stopped and restarted in
2844 # the middle of a job's delay cycle, the delay cycle will either be
2845 # repeated or skipped depending on the number of Pending machines found
2846 # when the restarted scheduler recovers to track it. Not a problem.
2847 #
2848 # A reference to the DelayedCallTask that will wake up the job should
2849 # no other HQEs change state in time. Its end_time attribute is used
2850 # by our run_with_ready_delay() method to determine if the wait is over.
2851 _delay_ready_task = None
2852
2853 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2854 # all status='Pending' atomic group HQEs incase a delay was running when the
2855 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002856
showarda3c58572009-03-12 20:36:59 +00002857 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002858 assert id or row
showarda3c58572009-03-12 20:36:59 +00002859 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002860
mblighe2586682008-02-29 22:45:46 +00002861
jadmanski0afbb632008-06-06 21:10:57 +00002862 def is_server_job(self):
2863 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002864
2865
showard170873e2009-01-07 00:22:26 +00002866 def tag(self):
2867 return "%s-%s" % (self.id, self.owner)
2868
2869
jadmanski0afbb632008-06-06 21:10:57 +00002870 def get_host_queue_entries(self):
2871 rows = _db.execute("""
2872 SELECT * FROM host_queue_entries
2873 WHERE job_id= %s
2874 """, (self.id,))
2875 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002876
jadmanski0afbb632008-06-06 21:10:57 +00002877 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002878
jadmanski0afbb632008-06-06 21:10:57 +00002879 return entries
mbligh36768f02008-02-22 18:28:33 +00002880
2881
jadmanski0afbb632008-06-06 21:10:57 +00002882 def set_status(self, status, update_queues=False):
2883 self.update_field('status',status)
2884
2885 if update_queues:
2886 for queue_entry in self.get_host_queue_entries():
2887 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002888
2889
showard77182562009-06-10 00:16:05 +00002890 def _atomic_and_has_started(self):
2891 """
2892 @returns True if any of the HostQueueEntries associated with this job
2893 have entered the Status.STARTING state or beyond.
2894 """
2895 atomic_entries = models.HostQueueEntry.objects.filter(
2896 job=self.id, atomic_group__isnull=False)
2897 if atomic_entries.count() <= 0:
2898 return False
2899
showardaf8b4ca2009-06-16 18:47:26 +00002900 # These states may *only* be reached if Job.run() has been called.
2901 started_statuses = (models.HostQueueEntry.Status.STARTING,
2902 models.HostQueueEntry.Status.RUNNING,
2903 models.HostQueueEntry.Status.COMPLETED)
2904
2905 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002906 return started_entries.count() > 0
2907
2908
showard708b3522009-08-20 23:26:15 +00002909 def _hosts_assigned_count(self):
2910 """The number of HostQueueEntries assigned a Host for this job."""
2911 entries = models.HostQueueEntry.objects.filter(job=self.id,
2912 host__isnull=False)
2913 return entries.count()
2914
2915
showard77182562009-06-10 00:16:05 +00002916 def _pending_count(self):
2917 """The number of HostQueueEntries for this job in the Pending state."""
2918 pending_entries = models.HostQueueEntry.objects.filter(
2919 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2920 return pending_entries.count()
2921
2922
jadmanski0afbb632008-06-06 21:10:57 +00002923 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002924 # NOTE: Atomic group jobs stop reporting ready after they have been
2925 # started to avoid launching multiple copies of one atomic job.
2926 # Only possible if synch_count is less than than half the number of
2927 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002928 pending_count = self._pending_count()
2929 atomic_and_has_started = self._atomic_and_has_started()
2930 ready = (pending_count >= self.synch_count
2931 and not self._atomic_and_has_started())
2932
2933 if not ready:
2934 logging.info(
2935 'Job %s not ready: %s pending, %s required '
2936 '(Atomic and started: %s)',
2937 self, pending_count, self.synch_count,
2938 atomic_and_has_started)
2939
2940 return ready
mbligh36768f02008-02-22 18:28:33 +00002941
2942
jadmanski0afbb632008-06-06 21:10:57 +00002943 def num_machines(self, clause = None):
2944 sql = "job_id=%s" % self.id
2945 if clause:
2946 sql += " AND (%s)" % clause
2947 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002948
2949
jadmanski0afbb632008-06-06 21:10:57 +00002950 def num_queued(self):
2951 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002952
2953
jadmanski0afbb632008-06-06 21:10:57 +00002954 def num_active(self):
2955 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002956
2957
jadmanski0afbb632008-06-06 21:10:57 +00002958 def num_complete(self):
2959 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002960
2961
jadmanski0afbb632008-06-06 21:10:57 +00002962 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002963 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002964
mbligh36768f02008-02-22 18:28:33 +00002965
showard6bb7c292009-01-30 01:44:51 +00002966 def _not_yet_run_entries(self, include_verifying=True):
2967 statuses = [models.HostQueueEntry.Status.QUEUED,
2968 models.HostQueueEntry.Status.PENDING]
2969 if include_verifying:
2970 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2971 return models.HostQueueEntry.objects.filter(job=self.id,
2972 status__in=statuses)
2973
2974
2975 def _stop_all_entries(self):
2976 entries_to_stop = self._not_yet_run_entries(
2977 include_verifying=False)
2978 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002979 assert not child_entry.complete, (
2980 '%s status=%s, active=%s, complete=%s' %
2981 (child_entry.id, child_entry.status, child_entry.active,
2982 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002983 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2984 child_entry.host.status = models.Host.Status.READY
2985 child_entry.host.save()
2986 child_entry.status = models.HostQueueEntry.Status.STOPPED
2987 child_entry.save()
2988
showard2bab8f42008-11-12 18:15:22 +00002989 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002990 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002991 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002992 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002993
2994
jadmanski0afbb632008-06-06 21:10:57 +00002995 def write_to_machines_file(self, queue_entry):
2996 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002997 file_path = os.path.join(self.tag(), '.machines')
2998 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002999
3000
showardf1ae3542009-05-11 19:26:02 +00003001 def _next_group_name(self, group_name=''):
3002 """@returns a directory name to use for the next host group results."""
3003 if group_name:
3004 # Sanitize for use as a pathname.
3005 group_name = group_name.replace(os.path.sep, '_')
3006 if group_name.startswith('.'):
3007 group_name = '_' + group_name[1:]
3008 # Add a separator between the group name and 'group%d'.
3009 group_name += '.'
3010 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003011 query = models.HostQueueEntry.objects.filter(
3012 job=self.id).values('execution_subdir').distinct()
3013 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003014 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3015 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003016 if ids:
3017 next_id = max(ids) + 1
3018 else:
3019 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003020 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003021
3022
showard170873e2009-01-07 00:22:26 +00003023 def _write_control_file(self, execution_tag):
3024 control_path = _drone_manager.attach_file_to_execution(
3025 execution_tag, self.control_file)
3026 return control_path
mbligh36768f02008-02-22 18:28:33 +00003027
showardb2e2c322008-10-14 17:33:55 +00003028
showard2bab8f42008-11-12 18:15:22 +00003029 def get_group_entries(self, queue_entry_from_group):
3030 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003031 return list(HostQueueEntry.fetch(
3032 where='job_id=%s AND execution_subdir=%s',
3033 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003034
3035
showardb2e2c322008-10-14 17:33:55 +00003036 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003037 assert queue_entries
3038 execution_tag = queue_entries[0].execution_tag()
3039 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003040 hostnames = ','.join([entry.get_host().hostname
3041 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003042
showard87ba02a2009-04-20 19:37:32 +00003043 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003044 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003045 ['-P', execution_tag, '-n',
3046 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003047 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003048
jadmanski0afbb632008-06-06 21:10:57 +00003049 if not self.is_server_job():
3050 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003051
showardb2e2c322008-10-14 17:33:55 +00003052 return params
mblighe2586682008-02-29 22:45:46 +00003053
mbligh36768f02008-02-22 18:28:33 +00003054
showardc9ae1782009-01-30 01:42:37 +00003055 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003056 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003057 return True
showard0fc38302008-10-23 00:44:07 +00003058 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003059 return queue_entry.get_host().dirty
3060 return False
showard21baa452008-10-21 00:08:39 +00003061
showardc9ae1782009-01-30 01:42:37 +00003062
showard2fe3f1d2009-07-06 20:19:11 +00003063 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003064 do_not_verify = (queue_entry.host.protection ==
3065 host_protections.Protection.DO_NOT_VERIFY)
3066 if do_not_verify:
3067 return False
3068 return self.run_verify
3069
3070
showard77182562009-06-10 00:16:05 +00003071 def get_pre_job_tasks(self, queue_entry):
3072 """
3073 Get a list of tasks to perform before the host_queue_entry
3074 may be used to run this Job (such as Cleanup & Verify).
3075
3076 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003077 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003078 task in the list calls HostQueueEntry.on_pending(), which
3079 continues the flow of the job.
3080 """
showard21baa452008-10-21 00:08:39 +00003081 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003082 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003083 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003084 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003085 tasks.append(VerifyTask(queue_entry=queue_entry))
3086 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003087 return tasks
3088
3089
showardf1ae3542009-05-11 19:26:02 +00003090 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003091 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003092 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003093 else:
showardf1ae3542009-05-11 19:26:02 +00003094 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003095 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003096 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003097 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003098
3099 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003100 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003101
3102
3103 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003104 """
3105 @returns A tuple containing a list of HostQueueEntry instances to be
3106 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003107 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003108 """
showard77182562009-06-10 00:16:05 +00003109 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003110 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003111 if atomic_group:
3112 num_entries_wanted = atomic_group.max_number_of_machines
3113 else:
3114 num_entries_wanted = self.synch_count
3115 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003116
showardf1ae3542009-05-11 19:26:02 +00003117 if num_entries_wanted > 0:
3118 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003119 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003120 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003121 params=(self.id, include_queue_entry.id)))
3122
3123 # Sort the chosen hosts by hostname before slicing.
3124 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3125 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3126 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3127 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003128
showardf1ae3542009-05-11 19:26:02 +00003129 # Sanity check. We'll only ever be called if this can be met.
3130 assert len(chosen_entries) >= self.synch_count
3131
3132 if atomic_group:
3133 # Look at any meta_host and dependency labels and pick the first
3134 # one that also specifies this atomic group. Use that label name
3135 # as the group name if possible (it is more specific).
3136 group_name = atomic_group.name
3137 for label in include_queue_entry.get_labels():
3138 if label.atomic_group_id:
3139 assert label.atomic_group_id == atomic_group.id
3140 group_name = label.name
3141 break
3142 else:
3143 group_name = ''
3144
3145 self._assign_new_group(chosen_entries, group_name=group_name)
3146 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003147
3148
showard77182562009-06-10 00:16:05 +00003149 def run_if_ready(self, queue_entry):
3150 """
3151 @returns An Agent instance to ultimately run this job if enough hosts
3152 are ready for it to run.
3153 @returns None and potentially cleans up excess hosts if this Job
3154 is not ready to run.
3155 """
showardb2e2c322008-10-14 17:33:55 +00003156 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003157 self.stop_if_necessary()
3158 return None
mbligh36768f02008-02-22 18:28:33 +00003159
showard77182562009-06-10 00:16:05 +00003160 if queue_entry.atomic_group:
3161 return self.run_with_ready_delay(queue_entry)
3162
3163 return self.run(queue_entry)
3164
3165
3166 def run_with_ready_delay(self, queue_entry):
3167 """
3168 Start a delay to wait for more hosts to enter Pending state before
3169 launching an atomic group job. Once set, the a delay cannot be reset.
3170
3171 @param queue_entry: The HostQueueEntry object to get atomic group
3172 info from and pass to run_if_ready when the delay is up.
3173
3174 @returns An Agent to run the job as appropriate or None if a delay
3175 has already been set.
3176 """
3177 assert queue_entry.job_id == self.id
3178 assert queue_entry.atomic_group
3179 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003180 pending_threshold = min(self._hosts_assigned_count(),
3181 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003182 over_max_threshold = (self._pending_count() >= pending_threshold)
3183 delay_expired = (self._delay_ready_task and
3184 time.time() >= self._delay_ready_task.end_time)
3185
3186 # Delay is disabled or we already have enough? Do not wait to run.
3187 if not delay or over_max_threshold or delay_expired:
3188 return self.run(queue_entry)
3189
3190 # A delay was previously scheduled.
3191 if self._delay_ready_task:
3192 return None
3193
3194 def run_job_after_delay():
3195 logging.info('Job %s done waiting for extra hosts.', self.id)
3196 return self.run(queue_entry)
3197
showard708b3522009-08-20 23:26:15 +00003198 logging.info('Job %s waiting up to %s seconds for more hosts.',
3199 self.id, delay)
showard77182562009-06-10 00:16:05 +00003200 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3201 callback=run_job_after_delay)
3202
3203 return Agent([self._delay_ready_task], num_processes=0)
3204
3205
3206 def run(self, queue_entry):
3207 """
3208 @param queue_entry: The HostQueueEntry instance calling this method.
3209 @returns An Agent instance to run this job or None if we've already
3210 been run.
3211 """
3212 if queue_entry.atomic_group and self._atomic_and_has_started():
3213 logging.error('Job.run() called on running atomic Job %d '
3214 'with HQE %s.', self.id, queue_entry)
3215 return None
showardf1ae3542009-05-11 19:26:02 +00003216 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3217 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003218
3219
showardf1ae3542009-05-11 19:26:02 +00003220 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003221 for queue_entry in queue_entries:
3222 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003223 params = self._get_autoserv_params(queue_entries)
3224 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003225 cmd=params, group_name=group_name)
3226 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003227 if self._delay_ready_task:
3228 # Cancel any pending callback that would try to run again
3229 # as we are already running.
3230 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003231
showard170873e2009-01-07 00:22:26 +00003232 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003233
3234
showardb000a8d2009-07-28 20:02:07 +00003235 def __str__(self):
3236 return '%s-%s' % (self.id, self.owner)
3237
3238
mbligh36768f02008-02-22 18:28:33 +00003239if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003240 main()