blob: 54871bcb8326a0c905d5d21087a96f3d6874fe48 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
mbligh6fbdb802009-08-03 16:42:55 +000099 logging.error(msg)
showard5613c662009-06-08 23:30:33 +0000100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000693 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000694 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000695 self._reverify_remaining_hosts()
696 # reinitialize drones after killing orphaned processes, since they can
697 # leave around files when they die
698 _drone_manager.execute_actions()
699 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000700
showard170873e2009-01-07 00:22:26 +0000701
702 def _register_pidfiles(self):
703 # during recovery we may need to read pidfiles for both running and
704 # parsing entries
705 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000706 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000707 special_tasks = models.SpecialTask.objects.filter(is_active=True)
708 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000709 for pidfile_name in _ALL_PIDFILE_NAMES:
710 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000711 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000712 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000713
714
showarded2afea2009-07-07 20:54:07 +0000715 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
716 run_monitor = PidfileRunMonitor()
717 run_monitor.attach_to_existing_process(execution_path,
718 pidfile_name=pidfile_name)
719 if run_monitor.has_process():
720 orphans.discard(run_monitor.get_process())
721 return run_monitor, '(process %s)' % run_monitor.get_process()
722 return None, 'without process'
723
724
showardd3dc1992009-04-22 21:01:40 +0000725 def _recover_entries_with_status(self, status, orphans, pidfile_name,
726 recover_entries_fn):
727 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000728 for queue_entry in queue_entries:
729 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000730 # synchronous job we've already recovered
731 continue
showardd3dc1992009-04-22 21:01:40 +0000732 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000733 run_monitor, process_string = self._get_recovery_run_monitor(
734 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000735
showarded2afea2009-07-07 20:54:07 +0000736 logging.info('Recovering %s entry %s %s',status.lower(),
737 ', '.join(str(entry) for entry in queue_entries),
738 process_string)
showardd3dc1992009-04-22 21:01:40 +0000739 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000740
741
showard6878e8b2009-07-20 22:37:45 +0000742 def _check_for_remaining_orphan_processes(self, orphans):
743 if not orphans:
744 return
745 subject = 'Unrecovered orphan autoserv processes remain'
746 message = '\n'.join(str(process) for process in orphans)
747 email_manager.manager.enqueue_notify_email(subject, message)
mbligh5fa9e112009-08-03 16:46:06 +0000748
749 die_on_orphans = global_config.global_config.get_config_value(
750 scheduler_config.CONFIG_SECTION, 'die_on_orphans', type=bool)
751
752 if die_on_orphans:
753 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000754
showard170873e2009-01-07 00:22:26 +0000755
showardd3dc1992009-04-22 21:01:40 +0000756 def _recover_running_entries(self, orphans):
757 def recover_entries(job, queue_entries, run_monitor):
758 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000759 queue_task = QueueTask(job=job, queue_entries=queue_entries,
760 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000761 self.add_agent(Agent(tasks=[queue_task],
762 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000763 else:
764 # we could do better, but this retains legacy behavior for now
765 for queue_entry in queue_entries:
766 logging.info('Requeuing running HQE %s since it has no '
767 'process' % queue_entry)
768 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000769
770 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000771 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000772 recover_entries)
773
774
775 def _recover_gathering_entries(self, orphans):
776 def recover_entries(job, queue_entries, run_monitor):
777 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000778 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000779 self.add_agent(Agent([gather_task]))
780
781 self._recover_entries_with_status(
782 models.HostQueueEntry.Status.GATHERING,
783 orphans, _CRASHINFO_PID_FILE, recover_entries)
784
785
786 def _recover_parsing_entries(self, orphans):
787 def recover_entries(job, queue_entries, run_monitor):
788 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000789 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000790 self.add_agent(Agent([reparse_task], num_processes=0))
791
792 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
793 orphans, _PARSER_PID_FILE,
794 recover_entries)
795
796
797 def _recover_all_recoverable_entries(self):
798 orphans = _drone_manager.get_orphaned_autoserv_processes()
799 self._recover_running_entries(orphans)
800 self._recover_gathering_entries(orphans)
801 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000802 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000803 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000804
showard97aed502008-11-04 02:01:24 +0000805
showarded2afea2009-07-07 20:54:07 +0000806 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000807 """\
808 Recovers all special tasks that have started running but have not
809 completed.
810 """
811
812 tasks = models.SpecialTask.objects.filter(is_active=True,
813 is_complete=False)
814 # Use ordering to force NULL queue_entry_id's to the end of the list
showarda5288b42009-07-28 20:06:08 +0000815 for task in tasks.order_by('-queue_entry__id'):
showard9b6ec502009-08-20 23:25:17 +0000816 if self.host_has_agent(task.host):
817 raise SchedulerError(
818 "%s already has a host agent %s." % (
819 task, self._host_agents.get(host.id)))
showard2fe3f1d2009-07-06 20:19:11 +0000820
821 host = Host(id=task.host.id)
822 queue_entry = None
823 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000824 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000825
showarded2afea2009-07-07 20:54:07 +0000826 run_monitor, process_string = self._get_recovery_run_monitor(
827 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
828
829 logging.info('Recovering %s %s', task, process_string)
830 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000831
832
showarded2afea2009-07-07 20:54:07 +0000833 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000834 """\
835 Recovers a single special task.
836 """
837 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000838 agent_tasks = self._recover_verify(task, host, queue_entry,
839 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000840 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000841 agent_tasks = self._recover_repair(task, host, queue_entry,
842 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000843 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000844 agent_tasks = self._recover_cleanup(task, host, queue_entry,
845 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000846 else:
847 # Should never happen
848 logging.error(
849 "Special task id %d had invalid task %s", (task.id, task.task))
850
851 self.add_agent(Agent(agent_tasks))
852
853
showarded2afea2009-07-07 20:54:07 +0000854 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000855 """\
856 Recovers a verify task.
857 No associated queue entry: Verify host
858 With associated queue entry: Verify host, and run associated queue
859 entry
860 """
861 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000862 return [VerifyTask(host=host, task=task,
863 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000864 else:
showarded2afea2009-07-07 20:54:07 +0000865 return [VerifyTask(queue_entry=queue_entry, task=task,
866 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000867 SetEntryPendingTask(queue_entry=queue_entry)]
868
869
showarded2afea2009-07-07 20:54:07 +0000870 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000871 """\
872 Recovers a repair task.
873 Always repair host
874 """
showarded2afea2009-07-07 20:54:07 +0000875 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
876 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000877
878
showarded2afea2009-07-07 20:54:07 +0000879 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000880 """\
881 Recovers a cleanup task.
882 No associated queue entry: Clean host
883 With associated queue entry: Clean host, verify host if needed, and
884 run associated queue entry
885 """
886 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000887 return [CleanupTask(host=host, task=task,
888 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000889 else:
890 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000891 task=task,
892 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000893 if queue_entry.job.should_run_verify(queue_entry):
894 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
895 agent_tasks.append(
896 SetEntryPendingTask(queue_entry=queue_entry))
897 return agent_tasks
898
899
showard6af73ad2009-07-28 20:00:58 +0000900 def _requeue_starting_entries(self):
901 # temporary measure until we implement proper recovery of Starting HQEs
902 for entry in HostQueueEntry.fetch(where='status="Starting"'):
903 logging.info('Requeuing "Starting" queue entry %s' % entry)
904 assert not self.get_agents_for_entry(entry)
905 assert entry.host.status == models.Host.Status.PENDING
906 self._reverify_hosts_where('id = %s' % entry.host.id)
907 entry.requeue()
908
909
showard6878e8b2009-07-20 22:37:45 +0000910 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000911 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000912 where='active AND NOT complete AND '
913 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000914
showarde8e37072009-08-20 23:31:30 +0000915 unrecovered_active_hqes = [entry for entry in queue_entries
916 if not self.get_agents_for_entry(entry)]
917 if unrecovered_active_hqes:
918 message = '\n'.join(str(hqe) for hqe in unrecovered_active_hqes)
919 raise SchedulerError(
920 '%d unrecovered active host queue entries:\n%s' %
921 (len(unrecovered_active_hqes), message))
showard170873e2009-01-07 00:22:26 +0000922
923
showard1ff7b2e2009-05-15 23:17:18 +0000924 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000925 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000926 task=models.SpecialTask.Task.VERIFY, is_active=False,
927 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000928
showard2fe3f1d2009-07-06 20:19:11 +0000929 for task in tasks:
930 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
931 if host.locked or host.invalid or self.host_has_agent(host):
932 continue
showard6d7b2ff2009-06-10 00:16:47 +0000933
showard2fe3f1d2009-07-06 20:19:11 +0000934 logging.info('Force reverifying host %s', host.hostname)
935 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000936
937
showard170873e2009-01-07 00:22:26 +0000938 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000939 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000940 # should never happen
showarded2afea2009-07-07 20:54:07 +0000941 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000942 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000943 self._reverify_hosts_where(
944 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
945 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000946
947
jadmanski0afbb632008-06-06 21:10:57 +0000948 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000949 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000950 full_where='locked = 0 AND invalid = 0 AND ' + where
951 for host in Host.fetch(where=full_where):
952 if self.host_has_agent(host):
953 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000954 continue
showard170873e2009-01-07 00:22:26 +0000955 if print_message:
showardb18134f2009-03-20 20:52:18 +0000956 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000957 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000958 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000959
960
jadmanski0afbb632008-06-06 21:10:57 +0000961 def _recover_hosts(self):
962 # recover "Repair Failed" hosts
963 message = 'Reverifying dead host %s'
964 self._reverify_hosts_where("status = 'Repair Failed'",
965 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000966
967
showard04c82c52008-05-29 19:38:12 +0000968
showardb95b1bd2008-08-15 18:11:04 +0000969 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000970 # prioritize by job priority, then non-metahost over metahost, then FIFO
971 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000972 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000973 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000974 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000975
976
showard89f84db2009-03-12 20:39:13 +0000977 def _refresh_pending_queue_entries(self):
978 """
979 Lookup the pending HostQueueEntries and call our HostScheduler
980 refresh() method given that list. Return the list.
981
982 @returns A list of pending HostQueueEntries sorted in priority order.
983 """
showard63a34772008-08-18 19:32:50 +0000984 queue_entries = self._get_pending_queue_entries()
985 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000986 return []
showardb95b1bd2008-08-15 18:11:04 +0000987
showard63a34772008-08-18 19:32:50 +0000988 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000989
showard89f84db2009-03-12 20:39:13 +0000990 return queue_entries
991
992
993 def _schedule_atomic_group(self, queue_entry):
994 """
995 Schedule the given queue_entry on an atomic group of hosts.
996
997 Returns immediately if there are insufficient available hosts.
998
999 Creates new HostQueueEntries based off of queue_entry for the
1000 scheduled hosts and starts them all running.
1001 """
1002 # This is a virtual host queue entry representing an entire
1003 # atomic group, find a group and schedule their hosts.
1004 group_hosts = self._host_scheduler.find_eligible_atomic_group(
1005 queue_entry)
1006 if not group_hosts:
1007 return
showardcbe6f942009-06-17 19:33:49 +00001008
1009 logging.info('Expanding atomic group entry %s with hosts %s',
1010 queue_entry,
1011 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001012 # The first assigned host uses the original HostQueueEntry
1013 group_queue_entries = [queue_entry]
1014 for assigned_host in group_hosts[1:]:
1015 # Create a new HQE for every additional assigned_host.
1016 new_hqe = HostQueueEntry.clone(queue_entry)
1017 new_hqe.save()
1018 group_queue_entries.append(new_hqe)
1019 assert len(group_queue_entries) == len(group_hosts)
1020 for queue_entry, host in itertools.izip(group_queue_entries,
1021 group_hosts):
1022 self._run_queue_entry(queue_entry, host)
1023
1024
1025 def _schedule_new_jobs(self):
1026 queue_entries = self._refresh_pending_queue_entries()
1027 if not queue_entries:
1028 return
1029
showard63a34772008-08-18 19:32:50 +00001030 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001031 if (queue_entry.atomic_group_id is None or
1032 queue_entry.host_id is not None):
1033 assigned_host = self._host_scheduler.find_eligible_host(
1034 queue_entry)
1035 if assigned_host:
1036 self._run_queue_entry(queue_entry, assigned_host)
1037 else:
1038 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001039
1040
1041 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001042 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1043 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001044
1045
jadmanski0afbb632008-06-06 21:10:57 +00001046 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001047 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
showardf4a2e502009-07-28 20:06:39 +00001048 logging.info('Aborting %s', entry)
showardd3dc1992009-04-22 21:01:40 +00001049 for agent in self.get_agents_for_entry(entry):
1050 agent.abort()
1051 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001052
1053
showard324bf812009-01-20 23:23:38 +00001054 def _can_start_agent(self, agent, num_started_this_cycle,
1055 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001056 # always allow zero-process agents to run
1057 if agent.num_processes == 0:
1058 return True
1059 # don't allow any nonzero-process agents to run after we've reached a
1060 # limit (this avoids starvation of many-process agents)
1061 if have_reached_limit:
1062 return False
1063 # total process throttling
showard324bf812009-01-20 23:23:38 +00001064 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001065 return False
1066 # if a single agent exceeds the per-cycle throttling, still allow it to
1067 # run when it's the first agent in the cycle
1068 if num_started_this_cycle == 0:
1069 return True
1070 # per-cycle throttling
1071 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001072 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001073 return False
1074 return True
1075
1076
jadmanski0afbb632008-06-06 21:10:57 +00001077 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001078 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001079 have_reached_limit = False
1080 # iterate over copy, so we can remove agents during iteration
1081 for agent in list(self._agents):
1082 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001083 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001084 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001085 continue
1086 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001087 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001088 have_reached_limit):
1089 have_reached_limit = True
1090 continue
showard4c5374f2008-09-04 17:02:56 +00001091 num_started_this_cycle += agent.num_processes
1092 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001093 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001094 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001095
1096
showard29f7cd22009-04-29 21:16:24 +00001097 def _process_recurring_runs(self):
1098 recurring_runs = models.RecurringRun.objects.filter(
1099 start_date__lte=datetime.datetime.now())
1100 for rrun in recurring_runs:
1101 # Create job from template
1102 job = rrun.job
1103 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001104 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001105
1106 host_objects = info['hosts']
1107 one_time_hosts = info['one_time_hosts']
1108 metahost_objects = info['meta_hosts']
1109 dependencies = info['dependencies']
1110 atomic_group = info['atomic_group']
1111
1112 for host in one_time_hosts or []:
1113 this_host = models.Host.create_one_time_host(host.hostname)
1114 host_objects.append(this_host)
1115
1116 try:
1117 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001118 options=options,
showard29f7cd22009-04-29 21:16:24 +00001119 host_objects=host_objects,
1120 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001121 atomic_group=atomic_group)
1122
1123 except Exception, ex:
1124 logging.exception(ex)
1125 #TODO send email
1126
1127 if rrun.loop_count == 1:
1128 rrun.delete()
1129 else:
1130 if rrun.loop_count != 0: # if not infinite loop
1131 # calculate new start_date
1132 difference = datetime.timedelta(seconds=rrun.loop_period)
1133 rrun.start_date = rrun.start_date + difference
1134 rrun.loop_count -= 1
1135 rrun.save()
1136
1137
showard170873e2009-01-07 00:22:26 +00001138class PidfileRunMonitor(object):
1139 """
1140 Client must call either run() to start a new process or
1141 attach_to_existing_process().
1142 """
mbligh36768f02008-02-22 18:28:33 +00001143
showard170873e2009-01-07 00:22:26 +00001144 class _PidfileException(Exception):
1145 """
1146 Raised when there's some unexpected behavior with the pid file, but only
1147 used internally (never allowed to escape this class).
1148 """
mbligh36768f02008-02-22 18:28:33 +00001149
1150
showard170873e2009-01-07 00:22:26 +00001151 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001152 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001153 self._start_time = None
1154 self.pidfile_id = None
1155 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001156
1157
showard170873e2009-01-07 00:22:26 +00001158 def _add_nice_command(self, command, nice_level):
1159 if not nice_level:
1160 return command
1161 return ['nice', '-n', str(nice_level)] + command
1162
1163
1164 def _set_start_time(self):
1165 self._start_time = time.time()
1166
1167
1168 def run(self, command, working_directory, nice_level=None, log_file=None,
1169 pidfile_name=None, paired_with_pidfile=None):
1170 assert command is not None
1171 if nice_level is not None:
1172 command = ['nice', '-n', str(nice_level)] + command
1173 self._set_start_time()
1174 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001175 command, working_directory, pidfile_name=pidfile_name,
1176 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001177
1178
showarded2afea2009-07-07 20:54:07 +00001179 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001180 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001181 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001182 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001183 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001184 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001185
1186
jadmanski0afbb632008-06-06 21:10:57 +00001187 def kill(self):
showard170873e2009-01-07 00:22:26 +00001188 if self.has_process():
1189 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001190
mbligh36768f02008-02-22 18:28:33 +00001191
showard170873e2009-01-07 00:22:26 +00001192 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001193 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001194 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001195
1196
showard170873e2009-01-07 00:22:26 +00001197 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001198 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001199 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001200 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001201
1202
showard170873e2009-01-07 00:22:26 +00001203 def _read_pidfile(self, use_second_read=False):
1204 assert self.pidfile_id is not None, (
1205 'You must call run() or attach_to_existing_process()')
1206 contents = _drone_manager.get_pidfile_contents(
1207 self.pidfile_id, use_second_read=use_second_read)
1208 if contents.is_invalid():
1209 self._state = drone_manager.PidfileContents()
1210 raise self._PidfileException(contents)
1211 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001212
1213
showard21baa452008-10-21 00:08:39 +00001214 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001215 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1216 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001217 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001218 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001219
1220
1221 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001222 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001223 return
mblighbb421852008-03-11 22:36:16 +00001224
showard21baa452008-10-21 00:08:39 +00001225 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001226
showard170873e2009-01-07 00:22:26 +00001227 if self._state.process is None:
1228 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001229 return
mbligh90a549d2008-03-25 23:52:34 +00001230
showard21baa452008-10-21 00:08:39 +00001231 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001232 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001233 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001234 return
mbligh90a549d2008-03-25 23:52:34 +00001235
showard170873e2009-01-07 00:22:26 +00001236 # pid but no running process - maybe process *just* exited
1237 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001238 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001239 # autoserv exited without writing an exit code
1240 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001241 self._handle_pidfile_error(
1242 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001243
showard21baa452008-10-21 00:08:39 +00001244
1245 def _get_pidfile_info(self):
1246 """\
1247 After completion, self._state will contain:
1248 pid=None, exit_status=None if autoserv has not yet run
1249 pid!=None, exit_status=None if autoserv is running
1250 pid!=None, exit_status!=None if autoserv has completed
1251 """
1252 try:
1253 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001254 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001255 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001256
1257
showard170873e2009-01-07 00:22:26 +00001258 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001259 """\
1260 Called when no pidfile is found or no pid is in the pidfile.
1261 """
showard170873e2009-01-07 00:22:26 +00001262 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001263 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1264 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001265 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001266 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001267
1268
showard35162b02009-03-03 02:17:30 +00001269 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001270 """\
1271 Called when autoserv has exited without writing an exit status,
1272 or we've timed out waiting for autoserv to write a pid to the
1273 pidfile. In either case, we just return failure and the caller
1274 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001275
showard170873e2009-01-07 00:22:26 +00001276 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001277 """
1278 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001279 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001280 self._state.exit_status = 1
1281 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001282
1283
jadmanski0afbb632008-06-06 21:10:57 +00001284 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001285 self._get_pidfile_info()
1286 return self._state.exit_status
1287
1288
1289 def num_tests_failed(self):
showard6bba3d12009-08-20 23:31:41 +00001290 """@returns The number of tests that failed or -1 if unknown."""
showard21baa452008-10-21 00:08:39 +00001291 self._get_pidfile_info()
showard6bba3d12009-08-20 23:31:41 +00001292 if self._state.num_tests_failed is None:
1293 return -1
showard21baa452008-10-21 00:08:39 +00001294 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001295
1296
mbligh36768f02008-02-22 18:28:33 +00001297class Agent(object):
showard77182562009-06-10 00:16:05 +00001298 """
1299 An agent for use by the Dispatcher class to perform a sequence of tasks.
1300
1301 The following methods are required on all task objects:
1302 poll() - Called periodically to let the task check its status and
1303 update its internal state. If the task succeeded.
1304 is_done() - Returns True if the task is finished.
1305 abort() - Called when an abort has been requested. The task must
1306 set its aborted attribute to True if it actually aborted.
1307
1308 The following attributes are required on all task objects:
1309 aborted - bool, True if this task was aborted.
1310 failure_tasks - A sequence of tasks to be run using a new Agent
1311 by the dispatcher should this task fail.
1312 success - bool, True if this task succeeded.
1313 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1314 host_ids - A sequence of Host ids this task represents.
1315
1316 The following attribute is written to all task objects:
1317 agent - A reference to the Agent instance that the task has been
1318 added to.
1319 """
1320
1321
showard170873e2009-01-07 00:22:26 +00001322 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001323 """
1324 @param tasks: A list of tasks as described in the class docstring.
1325 @param num_processes: The number of subprocesses the Agent represents.
1326 This is used by the Dispatcher for managing the load on the
1327 system. Defaults to 1.
1328 """
jadmanski0afbb632008-06-06 21:10:57 +00001329 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001330 self.queue = None
showard77182562009-06-10 00:16:05 +00001331 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001332 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001333 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001334
showard170873e2009-01-07 00:22:26 +00001335 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1336 for task in tasks)
1337 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1338
showardd3dc1992009-04-22 21:01:40 +00001339 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001340 for task in tasks:
1341 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001342
1343
showardd3dc1992009-04-22 21:01:40 +00001344 def _clear_queue(self):
1345 self.queue = Queue.Queue(0)
1346
1347
showard170873e2009-01-07 00:22:26 +00001348 def _union_ids(self, id_lists):
1349 return set(itertools.chain(*id_lists))
1350
1351
jadmanski0afbb632008-06-06 21:10:57 +00001352 def add_task(self, task):
1353 self.queue.put_nowait(task)
1354 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001355
1356
jadmanski0afbb632008-06-06 21:10:57 +00001357 def tick(self):
showard21baa452008-10-21 00:08:39 +00001358 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001359 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001360 self.active_task.poll()
1361 if not self.active_task.is_done():
1362 return
1363 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001364
1365
jadmanski0afbb632008-06-06 21:10:57 +00001366 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001367 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001368 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001369 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001370 if not self.active_task.success:
1371 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001372 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001373
jadmanski0afbb632008-06-06 21:10:57 +00001374 if not self.is_done():
1375 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001379 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001380 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1381 # get reset.
1382 new_agent = Agent(self.active_task.failure_tasks)
1383 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001384
mblighe2586682008-02-29 22:45:46 +00001385
showard4c5374f2008-09-04 17:02:56 +00001386 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001387 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001388
1389
jadmanski0afbb632008-06-06 21:10:57 +00001390 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001391 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001392
1393
showardd3dc1992009-04-22 21:01:40 +00001394 def abort(self):
showard08a36412009-05-05 01:01:13 +00001395 # abort tasks until the queue is empty or a task ignores the abort
1396 while not self.is_done():
1397 if not self.active_task:
1398 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001399 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001400 if not self.active_task.aborted:
1401 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001402 return
1403 self.active_task = None
1404
showardd3dc1992009-04-22 21:01:40 +00001405
showard77182562009-06-10 00:16:05 +00001406class DelayedCallTask(object):
1407 """
1408 A task object like AgentTask for an Agent to run that waits for the
1409 specified amount of time to have elapsed before calling the supplied
1410 callback once and finishing. If the callback returns anything, it is
1411 assumed to be a new Agent instance and will be added to the dispatcher.
1412
1413 @attribute end_time: The absolute posix time after which this task will
1414 call its callback when it is polled and be finished.
1415
1416 Also has all attributes required by the Agent class.
1417 """
1418 def __init__(self, delay_seconds, callback, now_func=None):
1419 """
1420 @param delay_seconds: The delay in seconds from now that this task
1421 will call the supplied callback and be done.
1422 @param callback: A callable to be called by this task once after at
1423 least delay_seconds time has elapsed. It must return None
1424 or a new Agent instance.
1425 @param now_func: A time.time like function. Default: time.time.
1426 Used for testing.
1427 """
1428 assert delay_seconds > 0
1429 assert callable(callback)
1430 if not now_func:
1431 now_func = time.time
1432 self._now_func = now_func
1433 self._callback = callback
1434
1435 self.end_time = self._now_func() + delay_seconds
1436
1437 # These attributes are required by Agent.
1438 self.aborted = False
1439 self.failure_tasks = ()
1440 self.host_ids = ()
1441 self.success = False
1442 self.queue_entry_ids = ()
1443 # This is filled in by Agent.add_task().
1444 self.agent = None
1445
1446
1447 def poll(self):
1448 if self._callback and self._now_func() >= self.end_time:
1449 new_agent = self._callback()
1450 if new_agent:
1451 self.agent.dispatcher.add_agent(new_agent)
1452 self._callback = None
1453 self.success = True
1454
1455
1456 def is_done(self):
1457 return not self._callback
1458
1459
1460 def abort(self):
1461 self.aborted = True
1462 self._callback = None
1463
1464
mbligh36768f02008-02-22 18:28:33 +00001465class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001466 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1467 pidfile_name=None, paired_with_pidfile=None,
1468 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001469 self.done = False
1470 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001471 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001472 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001473 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001474 self.monitor = recover_run_monitor
1475 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001476 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001477 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001478 self.queue_entry_ids = []
1479 self.host_ids = []
1480 self.log_file = None
1481
1482
1483 def _set_ids(self, host=None, queue_entries=None):
1484 if queue_entries and queue_entries != [None]:
1485 self.host_ids = [entry.host.id for entry in queue_entries]
1486 self.queue_entry_ids = [entry.id for entry in queue_entries]
1487 else:
1488 assert host
1489 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001490
1491
jadmanski0afbb632008-06-06 21:10:57 +00001492 def poll(self):
showard08a36412009-05-05 01:01:13 +00001493 if not self.started:
1494 self.start()
1495 self.tick()
1496
1497
1498 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001499 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001500 exit_code = self.monitor.exit_code()
1501 if exit_code is None:
1502 return
1503 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001504 else:
1505 success = False
mbligh36768f02008-02-22 18:28:33 +00001506
jadmanski0afbb632008-06-06 21:10:57 +00001507 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001508
1509
jadmanski0afbb632008-06-06 21:10:57 +00001510 def is_done(self):
1511 return self.done
mbligh36768f02008-02-22 18:28:33 +00001512
1513
jadmanski0afbb632008-06-06 21:10:57 +00001514 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001515 if self.done:
1516 return
jadmanski0afbb632008-06-06 21:10:57 +00001517 self.done = True
1518 self.success = success
1519 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001520
1521
jadmanski0afbb632008-06-06 21:10:57 +00001522 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001523 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001524
mbligh36768f02008-02-22 18:28:33 +00001525
jadmanski0afbb632008-06-06 21:10:57 +00001526 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001527 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001528 _drone_manager.copy_to_results_repository(
1529 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def epilog(self):
1533 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001534
1535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def start(self):
1537 assert self.agent
1538
1539 if not self.started:
1540 self.prolog()
1541 self.run()
1542
1543 self.started = True
1544
1545
1546 def abort(self):
1547 if self.monitor:
1548 self.monitor.kill()
1549 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001550 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001551 self.cleanup()
1552
1553
showarded2afea2009-07-07 20:54:07 +00001554 def _get_consistent_execution_path(self, execution_entries):
1555 first_execution_path = execution_entries[0].execution_path()
1556 for execution_entry in execution_entries[1:]:
1557 assert execution_entry.execution_path() == first_execution_path, (
1558 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1559 execution_entry,
1560 first_execution_path,
1561 execution_entries[0]))
1562 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001563
1564
showarded2afea2009-07-07 20:54:07 +00001565 def _copy_results(self, execution_entries, use_monitor=None):
1566 """
1567 @param execution_entries: list of objects with execution_path() method
1568 """
showard6d1c1432009-08-20 23:30:39 +00001569 if use_monitor is not None and not use_monitor.has_process():
1570 return
1571
showarded2afea2009-07-07 20:54:07 +00001572 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001573 if use_monitor is None:
1574 assert self.monitor
1575 use_monitor = self.monitor
1576 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001577 execution_path = self._get_consistent_execution_path(execution_entries)
1578 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001579 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001580 results_path)
showardde634ee2009-01-30 01:44:24 +00001581
showarda1e74b32009-05-12 17:32:04 +00001582
1583 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001584 reparse_task = FinalReparseTask(queue_entries)
1585 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1586
1587
showarda1e74b32009-05-12 17:32:04 +00001588 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1589 self._copy_results(queue_entries, use_monitor)
1590 self._parse_results(queue_entries)
1591
1592
showardd3dc1992009-04-22 21:01:40 +00001593 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001594 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001595 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001596 self.monitor = PidfileRunMonitor()
1597 self.monitor.run(self.cmd, self._working_directory,
1598 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001599 log_file=self.log_file,
1600 pidfile_name=pidfile_name,
1601 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001602
1603
showardd9205182009-04-27 20:09:55 +00001604class TaskWithJobKeyvals(object):
1605 """AgentTask mixin providing functionality to help with job keyval files."""
1606 _KEYVAL_FILE = 'keyval'
1607 def _format_keyval(self, key, value):
1608 return '%s=%s' % (key, value)
1609
1610
1611 def _keyval_path(self):
1612 """Subclasses must override this"""
1613 raise NotImplemented
1614
1615
1616 def _write_keyval_after_job(self, field, value):
1617 assert self.monitor
1618 if not self.monitor.has_process():
1619 return
1620 _drone_manager.write_lines_to_file(
1621 self._keyval_path(), [self._format_keyval(field, value)],
1622 paired_with_process=self.monitor.get_process())
1623
1624
1625 def _job_queued_keyval(self, job):
1626 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1627
1628
1629 def _write_job_finished(self):
1630 self._write_keyval_after_job("job_finished", int(time.time()))
1631
1632
showarded2afea2009-07-07 20:54:07 +00001633class SpecialAgentTask(AgentTask):
1634 """
1635 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1636 """
1637
1638 TASK_TYPE = None
1639 host = None
1640 queue_entry = None
1641
1642 def __init__(self, task, extra_command_args, **kwargs):
1643 assert self.host
1644 assert (self.TASK_TYPE is not None,
1645 'self.TASK_TYPE must be overridden')
1646 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001647 if task:
1648 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001649 self._extra_command_args = extra_command_args
1650 super(SpecialAgentTask, self).__init__(**kwargs)
1651
1652
1653 def prolog(self):
1654 super(SpecialAgentTask, self).prolog()
1655 self.task = models.SpecialTask.prepare(self, self.task)
1656 self.cmd = _autoserv_command_line(self.host.hostname,
1657 self._extra_command_args,
1658 queue_entry=self.queue_entry)
1659 self._working_directory = self.task.execution_path()
1660 self.task.activate()
1661
1662
showardb6681aa2009-07-08 21:15:00 +00001663 def cleanup(self):
1664 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001665
1666 # self.task can be None if a SpecialAgentTask is aborted before the
1667 # prolog runs
1668 if self.task:
1669 self.task.finish()
1670
1671 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001672 self._copy_results([self.task])
1673
1674
1675class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1676 TASK_TYPE = models.SpecialTask.Task.REPAIR
1677
1678
1679 def __init__(self, host, queue_entry=None, task=None,
1680 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001681 """\
showard170873e2009-01-07 00:22:26 +00001682 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001683 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001684 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001685 # normalize the protection name
1686 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001687
jadmanski0afbb632008-06-06 21:10:57 +00001688 self.host = host
showard58721a82009-08-20 23:32:40 +00001689 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001690
showarded2afea2009-07-07 20:54:07 +00001691 super(RepairTask, self).__init__(
1692 task, ['-R', '--host-protection', protection],
1693 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001694
showard2fe3f1d2009-07-06 20:19:11 +00001695 # *don't* include the queue entry in IDs -- if the queue entry is
1696 # aborted, we want to leave the repair task running
1697 self._set_ids(host=host)
1698
mbligh36768f02008-02-22 18:28:33 +00001699
jadmanski0afbb632008-06-06 21:10:57 +00001700 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001701 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001702 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001703 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001704
mbligh36768f02008-02-22 18:28:33 +00001705
showardd9205182009-04-27 20:09:55 +00001706 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001707 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001708
1709
showardde634ee2009-01-30 01:44:24 +00001710 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001711 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001712
showard2fe3f1d2009-07-06 20:19:11 +00001713 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001714 return # don't fail metahost entries, they'll be reassigned
1715
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.update_from_database()
1717 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001718 return # entry has been aborted
1719
showard2fe3f1d2009-07-06 20:19:11 +00001720 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001721 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001722 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001723 self._write_keyval_after_job(queued_key, queued_time)
1724 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001725 # copy results logs into the normal place for job results
1726 _drone_manager.copy_results_on_drone(
1727 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001728 source_path=self._working_directory + '/',
1729 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001730
showard2fe3f1d2009-07-06 20:19:11 +00001731 self._copy_results([self.queue_entry])
1732 if self.queue_entry.job.parse_failed_repair:
1733 self._parse_results([self.queue_entry])
1734 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001735
1736
jadmanski0afbb632008-06-06 21:10:57 +00001737 def epilog(self):
1738 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001739
jadmanski0afbb632008-06-06 21:10:57 +00001740 if self.success:
1741 self.host.set_status('Ready')
1742 else:
1743 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001744 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001745 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001746
1747
showarded2afea2009-07-07 20:54:07 +00001748class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001749 def epilog(self):
1750 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001751 should_copy_results = (self.queue_entry and not self.success
1752 and not self.queue_entry.meta_host)
1753 if should_copy_results:
1754 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001755 log_name = os.path.basename(self.task.execution_path())
1756 source = os.path.join(self.task.execution_path(), 'debug',
1757 'autoserv.DEBUG')
1758 destination = os.path.join(self.queue_entry.execution_path(),
1759 log_name)
showard170873e2009-01-07 00:22:26 +00001760 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001761 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001762 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001763
showard58721a82009-08-20 23:32:40 +00001764 if not self.success and self.queue_entry:
1765 self.queue_entry.requeue()
1766
showard8fe93b52008-11-18 17:53:22 +00001767
1768class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001769 TASK_TYPE = models.SpecialTask.Task.VERIFY
1770
1771
1772 def __init__(self, queue_entry=None, host=None, task=None,
1773 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001774 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001775 self.host = host or queue_entry.host
1776 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001777
showarde788ea62008-11-17 21:02:47 +00001778 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001779 super(VerifyTask, self).__init__(
1780 task, ['-v'], failure_tasks=failure_tasks,
1781 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001782
showard170873e2009-01-07 00:22:26 +00001783 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001784
1785
jadmanski0afbb632008-06-06 21:10:57 +00001786 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001787 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001788
showardb18134f2009-03-20 20:52:18 +00001789 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001790 if self.queue_entry:
1791 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001792 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001793
showarded2afea2009-07-07 20:54:07 +00001794 # Delete any other queued verifies for this host. One verify will do
1795 # and there's no need to keep records of other requests.
1796 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001797 host__id=self.host.id,
1798 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001799 is_active=False, is_complete=False)
1800 queued_verifies = queued_verifies.exclude(id=self.task.id)
1801 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001802
mbligh36768f02008-02-22 18:28:33 +00001803
jadmanski0afbb632008-06-06 21:10:57 +00001804 def epilog(self):
1805 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001806 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001807 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001808
1809
showardb5626452009-06-30 01:57:28 +00001810class CleanupHostsMixin(object):
1811 def _reboot_hosts(self, job, queue_entries, final_success,
1812 num_tests_failed):
1813 reboot_after = job.reboot_after
1814 do_reboot = (
1815 # always reboot after aborted jobs
1816 self._final_status == models.HostQueueEntry.Status.ABORTED
1817 or reboot_after == models.RebootAfter.ALWAYS
1818 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1819 and final_success and num_tests_failed == 0))
1820
1821 for queue_entry in queue_entries:
1822 if do_reboot:
1823 # don't pass the queue entry to the CleanupTask. if the cleanup
1824 # fails, the job doesn't care -- it's over.
1825 cleanup_task = CleanupTask(host=queue_entry.host)
1826 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1827 else:
1828 queue_entry.host.set_status('Ready')
1829
1830
1831class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001832 def __init__(self, job, queue_entries, cmd=None, group_name='',
1833 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001834 self.job = job
1835 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001836 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001837 super(QueueTask, self).__init__(
1838 cmd, self._execution_path(),
1839 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001840 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001841
1842
showard73ec0442009-02-07 02:05:20 +00001843 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001844 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001845
1846
1847 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1848 keyval_contents = '\n'.join(self._format_keyval(key, value)
1849 for key, value in keyval_dict.iteritems())
1850 # always end with a newline to allow additional keyvals to be written
1851 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001852 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001853 keyval_contents,
1854 file_path=keyval_path)
1855
1856
1857 def _write_keyvals_before_job(self, keyval_dict):
1858 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1859
1860
showard170873e2009-01-07 00:22:26 +00001861 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001862 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001863 host.hostname)
1864 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001865 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1866 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001867
1868
showarded2afea2009-07-07 20:54:07 +00001869 def _execution_path(self):
1870 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001871
1872
jadmanski0afbb632008-06-06 21:10:57 +00001873 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001874 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001875 keyval_dict = {queued_key: queued_time}
1876 if self.group_name:
1877 keyval_dict['host_group_name'] = self.group_name
1878 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001879 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001880 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001881 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001882 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001883 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001884 queue_entry.host.update_field('dirty', 1)
showardc6a56872009-07-28 20:11:58 +00001885 if self.job.synch_count == 1 and len(self.queue_entries) == 1:
1886 # TODO(gps): Remove this if nothing needs it anymore.
1887 # A potential user is: tko/parser
jadmanski0afbb632008-06-06 21:10:57 +00001888 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001889
1890
showard35162b02009-03-03 02:17:30 +00001891 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001892 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001893 _drone_manager.write_lines_to_file(error_file_path,
1894 [_LOST_PROCESS_ERROR])
1895
1896
showardd3dc1992009-04-22 21:01:40 +00001897 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001898 if not self.monitor:
1899 return
1900
showardd9205182009-04-27 20:09:55 +00001901 self._write_job_finished()
1902
showard6d1c1432009-08-20 23:30:39 +00001903 gather_task = GatherLogsTask(self.job, self.queue_entries)
1904 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001905
1906 if self.monitor.lost_process:
1907 self._write_lost_process_error_file()
1908 for queue_entry in self.queue_entries:
1909 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001910
1911
showardcbd74612008-11-19 21:42:02 +00001912 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001913 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001914 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001915 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001916 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001917
1918
jadmanskif7fa2cc2008-10-01 14:13:23 +00001919 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001920 if not self.monitor or not self.monitor.has_process():
1921 return
1922
jadmanskif7fa2cc2008-10-01 14:13:23 +00001923 # build up sets of all the aborted_by and aborted_on values
1924 aborted_by, aborted_on = set(), set()
1925 for queue_entry in self.queue_entries:
1926 if queue_entry.aborted_by:
1927 aborted_by.add(queue_entry.aborted_by)
1928 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1929 aborted_on.add(t)
1930
1931 # extract some actual, unique aborted by value and write it out
1932 assert len(aborted_by) <= 1
1933 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001934 aborted_by_value = aborted_by.pop()
1935 aborted_on_value = max(aborted_on)
1936 else:
1937 aborted_by_value = 'autotest_system'
1938 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001939
showarda0382352009-02-11 23:36:43 +00001940 self._write_keyval_after_job("aborted_by", aborted_by_value)
1941 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001942
showardcbd74612008-11-19 21:42:02 +00001943 aborted_on_string = str(datetime.datetime.fromtimestamp(
1944 aborted_on_value))
1945 self._write_status_comment('Job aborted by %s on %s' %
1946 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001947
1948
jadmanski0afbb632008-06-06 21:10:57 +00001949 def abort(self):
1950 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001951 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001952 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001953
1954
jadmanski0afbb632008-06-06 21:10:57 +00001955 def epilog(self):
1956 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001957 self._finish_task()
1958 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001959
1960
showardd3dc1992009-04-22 21:01:40 +00001961class PostJobTask(AgentTask):
1962 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001963 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001964 self._queue_entries = queue_entries
1965 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001966
showarded2afea2009-07-07 20:54:07 +00001967 self._execution_path = self._get_consistent_execution_path(
1968 queue_entries)
1969 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001970 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001971 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001972 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1973
1974 if _testing_mode:
1975 command = 'true'
1976 else:
1977 command = self._generate_command(self._results_dir)
1978
showarded2afea2009-07-07 20:54:07 +00001979 super(PostJobTask, self).__init__(
1980 cmd=command, working_directory=self._execution_path,
1981 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001982
showarded2afea2009-07-07 20:54:07 +00001983 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001984 self._final_status = self._determine_final_status()
1985
1986
1987 def _generate_command(self, results_dir):
1988 raise NotImplementedError('Subclasses must override this')
1989
1990
1991 def _job_was_aborted(self):
1992 was_aborted = None
1993 for queue_entry in self._queue_entries:
1994 queue_entry.update_from_database()
1995 if was_aborted is None: # first queue entry
1996 was_aborted = bool(queue_entry.aborted)
1997 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1998 email_manager.manager.enqueue_notify_email(
1999 'Inconsistent abort state',
2000 'Queue entries have inconsistent abort state: ' +
2001 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
2002 # don't crash here, just assume true
2003 return True
2004 return was_aborted
2005
2006
2007 def _determine_final_status(self):
2008 if self._job_was_aborted():
2009 return models.HostQueueEntry.Status.ABORTED
2010
2011 # we'll use a PidfileRunMonitor to read the autoserv exit status
2012 if self._autoserv_monitor.exit_code() == 0:
2013 return models.HostQueueEntry.Status.COMPLETED
2014 return models.HostQueueEntry.Status.FAILED
2015
2016
2017 def run(self):
showard5add1c82009-05-26 19:27:46 +00002018 # make sure we actually have results to work with.
2019 # this should never happen in normal operation.
2020 if not self._autoserv_monitor.has_process():
2021 email_manager.manager.enqueue_notify_email(
2022 'No results in post-job task',
2023 'No results in post-job task at %s' %
2024 self._autoserv_monitor.pidfile_id)
2025 self.finished(False)
2026 return
2027
2028 super(PostJobTask, self).run(
2029 pidfile_name=self._pidfile_name,
2030 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002031
2032
2033 def _set_all_statuses(self, status):
2034 for queue_entry in self._queue_entries:
2035 queue_entry.set_status(status)
2036
2037
2038 def abort(self):
2039 # override AgentTask.abort() to avoid killing the process and ending
2040 # the task. post-job tasks continue when the job is aborted.
2041 pass
2042
2043
showardb5626452009-06-30 01:57:28 +00002044class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002045 """
2046 Task responsible for
2047 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2048 * copying logs to the results repository
2049 * spawning CleanupTasks for hosts, if necessary
2050 * spawning a FinalReparseTask for the job
2051 """
showarded2afea2009-07-07 20:54:07 +00002052 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002053 self._job = job
2054 super(GatherLogsTask, self).__init__(
2055 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002056 logfile_name='.collect_crashinfo.log',
2057 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002058 self._set_ids(queue_entries=queue_entries)
2059
2060
2061 def _generate_command(self, results_dir):
2062 host_list = ','.join(queue_entry.host.hostname
2063 for queue_entry in self._queue_entries)
2064 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2065 '-r', results_dir]
2066
2067
2068 def prolog(self):
2069 super(GatherLogsTask, self).prolog()
2070 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2071
2072
showardd3dc1992009-04-22 21:01:40 +00002073 def epilog(self):
2074 super(GatherLogsTask, self).epilog()
showardb5626452009-06-30 01:57:28 +00002075
showard6d1c1432009-08-20 23:30:39 +00002076 self._copy_and_parse_results(self._queue_entries,
2077 use_monitor=self._autoserv_monitor)
2078
2079 if self._autoserv_monitor.has_process():
2080 final_success = (self._final_status ==
2081 models.HostQueueEntry.Status.COMPLETED)
2082 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2083 else:
2084 final_success = False
2085 num_tests_failed = 0
2086
showardb5626452009-06-30 01:57:28 +00002087 self._reboot_hosts(self._job, self._queue_entries, final_success,
2088 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002089
2090
showard0bbfc212009-04-29 21:06:13 +00002091 def run(self):
showard597bfd32009-05-08 18:22:50 +00002092 autoserv_exit_code = self._autoserv_monitor.exit_code()
2093 # only run if Autoserv exited due to some signal. if we have no exit
2094 # code, assume something bad (and signal-like) happened.
2095 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002096 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002097 else:
2098 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002099
2100
showard8fe93b52008-11-18 17:53:22 +00002101class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002102 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2103
2104
2105 def __init__(self, host=None, queue_entry=None, task=None,
2106 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002107 assert bool(host) ^ bool(queue_entry)
2108 if queue_entry:
2109 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002110 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002111 self.host = host
showard170873e2009-01-07 00:22:26 +00002112
showarde788ea62008-11-17 21:02:47 +00002113 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002114 super(CleanupTask, self).__init__(
2115 task, ['--cleanup'], failure_tasks=[repair_task],
2116 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002117
2118 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002119
mblighd5c95802008-03-05 00:33:46 +00002120
jadmanski0afbb632008-06-06 21:10:57 +00002121 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002122 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002123 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002124 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002125
mblighd5c95802008-03-05 00:33:46 +00002126
showard21baa452008-10-21 00:08:39 +00002127 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002128 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002129
showard21baa452008-10-21 00:08:39 +00002130 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002131 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002132 self.host.update_field('dirty', 0)
2133
2134
showardd3dc1992009-04-22 21:01:40 +00002135class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002136 _num_running_parses = 0
2137
showarded2afea2009-07-07 20:54:07 +00002138 def __init__(self, queue_entries, recover_run_monitor=None):
2139 super(FinalReparseTask, self).__init__(
2140 queue_entries, pidfile_name=_PARSER_PID_FILE,
2141 logfile_name='.parse.log',
2142 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002143 # don't use _set_ids, since we don't want to set the host_ids
2144 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002145 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002146
showard97aed502008-11-04 02:01:24 +00002147
2148 @classmethod
2149 def _increment_running_parses(cls):
2150 cls._num_running_parses += 1
2151
2152
2153 @classmethod
2154 def _decrement_running_parses(cls):
2155 cls._num_running_parses -= 1
2156
2157
2158 @classmethod
2159 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002160 return (cls._num_running_parses <
2161 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002162
2163
2164 def prolog(self):
2165 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002166 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002167
2168
2169 def epilog(self):
2170 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002171 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002172
2173
showardd3dc1992009-04-22 21:01:40 +00002174 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002175 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002176 results_dir]
showard97aed502008-11-04 02:01:24 +00002177
2178
showard08a36412009-05-05 01:01:13 +00002179 def tick(self):
2180 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002181 # and we can, at which point we revert to default behavior
2182 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002183 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002184 else:
2185 self._try_starting_parse()
2186
2187
2188 def run(self):
2189 # override run() to not actually run unless we can
2190 self._try_starting_parse()
2191
2192
2193 def _try_starting_parse(self):
2194 if not self._can_run_new_parse():
2195 return
showard170873e2009-01-07 00:22:26 +00002196
showard97aed502008-11-04 02:01:24 +00002197 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002198 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002199
showard97aed502008-11-04 02:01:24 +00002200 self._increment_running_parses()
2201 self._parse_started = True
2202
2203
2204 def finished(self, success):
2205 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002206 if self._parse_started:
2207 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002208
2209
showardc9ae1782009-01-30 01:42:37 +00002210class SetEntryPendingTask(AgentTask):
2211 def __init__(self, queue_entry):
2212 super(SetEntryPendingTask, self).__init__(cmd='')
2213 self._queue_entry = queue_entry
2214 self._set_ids(queue_entries=[queue_entry])
2215
2216
2217 def run(self):
2218 agent = self._queue_entry.on_pending()
2219 if agent:
2220 self.agent.dispatcher.add_agent(agent)
2221 self.finished(True)
2222
2223
showarda3c58572009-03-12 20:36:59 +00002224class DBError(Exception):
2225 """Raised by the DBObject constructor when its select fails."""
2226
2227
mbligh36768f02008-02-22 18:28:33 +00002228class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002229 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002230
2231 # Subclasses MUST override these:
2232 _table_name = ''
2233 _fields = ()
2234
showarda3c58572009-03-12 20:36:59 +00002235 # A mapping from (type, id) to the instance of the object for that
2236 # particular id. This prevents us from creating new Job() and Host()
2237 # instances for every HostQueueEntry object that we instantiate as
2238 # multiple HQEs often share the same Job.
2239 _instances_by_type_and_id = weakref.WeakValueDictionary()
2240 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002241
showarda3c58572009-03-12 20:36:59 +00002242
2243 def __new__(cls, id=None, **kwargs):
2244 """
2245 Look to see if we already have an instance for this particular type
2246 and id. If so, use it instead of creating a duplicate instance.
2247 """
2248 if id is not None:
2249 instance = cls._instances_by_type_and_id.get((cls, id))
2250 if instance:
2251 return instance
2252 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2253
2254
2255 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002256 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002257 assert self._table_name, '_table_name must be defined in your class'
2258 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002259 if not new_record:
2260 if self._initialized and not always_query:
2261 return # We've already been initialized.
2262 if id is None:
2263 id = row[0]
2264 # Tell future constructors to use us instead of re-querying while
2265 # this instance is still around.
2266 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002267
showard6ae5ea92009-02-25 00:11:51 +00002268 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002269
jadmanski0afbb632008-06-06 21:10:57 +00002270 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002271
jadmanski0afbb632008-06-06 21:10:57 +00002272 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002273 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002274
showarda3c58572009-03-12 20:36:59 +00002275 if self._initialized:
2276 differences = self._compare_fields_in_row(row)
2277 if differences:
showard7629f142009-03-27 21:02:02 +00002278 logging.warn(
2279 'initialized %s %s instance requery is updating: %s',
2280 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002281 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002282 self._initialized = True
2283
2284
2285 @classmethod
2286 def _clear_instance_cache(cls):
2287 """Used for testing, clear the internal instance cache."""
2288 cls._instances_by_type_and_id.clear()
2289
2290
showardccbd6c52009-03-21 00:10:21 +00002291 def _fetch_row_from_db(self, row_id):
2292 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2293 rows = _db.execute(sql, (row_id,))
2294 if not rows:
showard76e29d12009-04-15 21:53:10 +00002295 raise DBError("row not found (table=%s, row id=%s)"
2296 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002297 return rows[0]
2298
2299
showarda3c58572009-03-12 20:36:59 +00002300 def _assert_row_length(self, row):
2301 assert len(row) == len(self._fields), (
2302 "table = %s, row = %s/%d, fields = %s/%d" % (
2303 self.__table, row, len(row), self._fields, len(self._fields)))
2304
2305
2306 def _compare_fields_in_row(self, row):
2307 """
2308 Given a row as returned by a SELECT query, compare it to our existing
2309 in memory fields.
2310
2311 @param row - A sequence of values corresponding to fields named in
2312 The class attribute _fields.
2313
2314 @returns A dictionary listing the differences keyed by field name
2315 containing tuples of (current_value, row_value).
2316 """
2317 self._assert_row_length(row)
2318 differences = {}
2319 for field, row_value in itertools.izip(self._fields, row):
2320 current_value = getattr(self, field)
2321 if current_value != row_value:
2322 differences[field] = (current_value, row_value)
2323 return differences
showard2bab8f42008-11-12 18:15:22 +00002324
2325
2326 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002327 """
2328 Update our field attributes using a single row returned by SELECT.
2329
2330 @param row - A sequence of values corresponding to fields named in
2331 the class fields list.
2332 """
2333 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002334
showard2bab8f42008-11-12 18:15:22 +00002335 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002336 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002337 setattr(self, field, value)
2338 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002339
showard2bab8f42008-11-12 18:15:22 +00002340 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002341
mblighe2586682008-02-29 22:45:46 +00002342
showardccbd6c52009-03-21 00:10:21 +00002343 def update_from_database(self):
2344 assert self.id is not None
2345 row = self._fetch_row_from_db(self.id)
2346 self._update_fields_from_row(row)
2347
2348
jadmanski0afbb632008-06-06 21:10:57 +00002349 def count(self, where, table = None):
2350 if not table:
2351 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002352
jadmanski0afbb632008-06-06 21:10:57 +00002353 rows = _db.execute("""
2354 SELECT count(*) FROM %s
2355 WHERE %s
2356 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002357
jadmanski0afbb632008-06-06 21:10:57 +00002358 assert len(rows) == 1
2359
2360 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002361
2362
showardd3dc1992009-04-22 21:01:40 +00002363 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002364 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002365
showard2bab8f42008-11-12 18:15:22 +00002366 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002367 return
mbligh36768f02008-02-22 18:28:33 +00002368
mblighf8c624d2008-07-03 16:58:45 +00002369 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002370 _db.execute(query, (value, self.id))
2371
showard2bab8f42008-11-12 18:15:22 +00002372 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002373
2374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def save(self):
2376 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002377 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002378 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002379 values = []
2380 for key in keys:
2381 value = getattr(self, key)
2382 if value is None:
2383 values.append('NULL')
2384 else:
2385 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002386 values_str = ','.join(values)
2387 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2388 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002389 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002390 # Update our id to the one the database just assigned to us.
2391 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002392
2393
jadmanski0afbb632008-06-06 21:10:57 +00002394 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002395 self._instances_by_type_and_id.pop((type(self), id), None)
2396 self._initialized = False
2397 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002398 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2399 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002400
2401
showard63a34772008-08-18 19:32:50 +00002402 @staticmethod
2403 def _prefix_with(string, prefix):
2404 if string:
2405 string = prefix + string
2406 return string
2407
2408
jadmanski0afbb632008-06-06 21:10:57 +00002409 @classmethod
showard989f25d2008-10-01 11:38:11 +00002410 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002411 """
2412 Construct instances of our class based on the given database query.
2413
2414 @yields One class instance for each row fetched.
2415 """
showard63a34772008-08-18 19:32:50 +00002416 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2417 where = cls._prefix_with(where, 'WHERE ')
2418 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002419 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002420 'joins' : joins,
2421 'where' : where,
2422 'order_by' : order_by})
2423 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002424 for row in rows:
2425 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002426
mbligh36768f02008-02-22 18:28:33 +00002427
2428class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002429 _table_name = 'ineligible_host_queues'
2430 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002431
2432
showard89f84db2009-03-12 20:39:13 +00002433class AtomicGroup(DBObject):
2434 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002435 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2436 'invalid')
showard89f84db2009-03-12 20:39:13 +00002437
2438
showard989f25d2008-10-01 11:38:11 +00002439class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002440 _table_name = 'labels'
2441 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002442 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002443
2444
showard6157c632009-07-06 20:19:31 +00002445 def __repr__(self):
2446 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2447 self.name, self.id, self.atomic_group_id)
2448
2449
mbligh36768f02008-02-22 18:28:33 +00002450class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002451 _table_name = 'hosts'
2452 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2453 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2454
2455
jadmanski0afbb632008-06-06 21:10:57 +00002456 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002457 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002458 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002459
2460
showard170873e2009-01-07 00:22:26 +00002461 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002462 """
showard170873e2009-01-07 00:22:26 +00002463 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002464 """
2465 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002466 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002467 FROM labels
2468 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002469 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002470 ORDER BY labels.name
2471 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002472 platform = None
2473 all_labels = []
2474 for label_name, is_platform in rows:
2475 if is_platform:
2476 platform = label_name
2477 all_labels.append(label_name)
2478 return platform, all_labels
2479
2480
showard2fe3f1d2009-07-06 20:19:11 +00002481 def reverify_tasks(self):
2482 cleanup_task = CleanupTask(host=self)
2483 verify_task = VerifyTask(host=self)
2484
showard6d7b2ff2009-06-10 00:16:47 +00002485 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002486 self.set_status('Cleaning')
2487 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002488
2489
showard54c1ea92009-05-20 00:32:58 +00002490 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2491
2492
2493 @classmethod
2494 def cmp_for_sort(cls, a, b):
2495 """
2496 A comparison function for sorting Host objects by hostname.
2497
2498 This strips any trailing numeric digits, ignores leading 0s and
2499 compares hostnames by the leading name and the trailing digits as a
2500 number. If both hostnames do not match this pattern, they are simply
2501 compared as lower case strings.
2502
2503 Example of how hostnames will be sorted:
2504
2505 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2506
2507 This hopefully satisfy most people's hostname sorting needs regardless
2508 of their exact naming schemes. Nobody sane should have both a host10
2509 and host010 (but the algorithm works regardless).
2510 """
2511 lower_a = a.hostname.lower()
2512 lower_b = b.hostname.lower()
2513 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2514 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2515 if match_a and match_b:
2516 name_a, number_a_str = match_a.groups()
2517 name_b, number_b_str = match_b.groups()
2518 number_a = int(number_a_str.lstrip('0'))
2519 number_b = int(number_b_str.lstrip('0'))
2520 result = cmp((name_a, number_a), (name_b, number_b))
2521 if result == 0 and lower_a != lower_b:
2522 # If they compared equal above but the lower case names are
2523 # indeed different, don't report equality. abc012 != abc12.
2524 return cmp(lower_a, lower_b)
2525 return result
2526 else:
2527 return cmp(lower_a, lower_b)
2528
2529
mbligh36768f02008-02-22 18:28:33 +00002530class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002531 _table_name = 'host_queue_entries'
2532 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002533 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002534 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002535
2536
showarda3c58572009-03-12 20:36:59 +00002537 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002538 assert id or row
showarda3c58572009-03-12 20:36:59 +00002539 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002540 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002541
jadmanski0afbb632008-06-06 21:10:57 +00002542 if self.host_id:
2543 self.host = Host(self.host_id)
2544 else:
2545 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002546
showard77182562009-06-10 00:16:05 +00002547 if self.atomic_group_id:
2548 self.atomic_group = AtomicGroup(self.atomic_group_id,
2549 always_query=False)
2550 else:
2551 self.atomic_group = None
2552
showard170873e2009-01-07 00:22:26 +00002553 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002554 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002555
2556
showard89f84db2009-03-12 20:39:13 +00002557 @classmethod
2558 def clone(cls, template):
2559 """
2560 Creates a new row using the values from a template instance.
2561
2562 The new instance will not exist in the database or have a valid
2563 id attribute until its save() method is called.
2564 """
2565 assert isinstance(template, cls)
2566 new_row = [getattr(template, field) for field in cls._fields]
2567 clone = cls(row=new_row, new_record=True)
2568 clone.id = None
2569 return clone
2570
2571
showardc85c21b2008-11-24 22:17:37 +00002572 def _view_job_url(self):
2573 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2574
2575
showardf1ae3542009-05-11 19:26:02 +00002576 def get_labels(self):
2577 """
2578 Get all labels associated with this host queue entry (either via the
2579 meta_host or as a job dependency label). The labels yielded are not
2580 guaranteed to be unique.
2581
2582 @yields Label instances associated with this host_queue_entry.
2583 """
2584 if self.meta_host:
2585 yield Label(id=self.meta_host, always_query=False)
2586 labels = Label.fetch(
2587 joins="JOIN jobs_dependency_labels AS deps "
2588 "ON (labels.id = deps.label_id)",
2589 where="deps.job_id = %d" % self.job.id)
2590 for label in labels:
2591 yield label
2592
2593
jadmanski0afbb632008-06-06 21:10:57 +00002594 def set_host(self, host):
2595 if host:
2596 self.queue_log_record('Assigning host ' + host.hostname)
2597 self.update_field('host_id', host.id)
2598 self.update_field('active', True)
2599 self.block_host(host.id)
2600 else:
2601 self.queue_log_record('Releasing host')
2602 self.unblock_host(self.host.id)
2603 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002604
jadmanski0afbb632008-06-06 21:10:57 +00002605 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002606
2607
jadmanski0afbb632008-06-06 21:10:57 +00002608 def get_host(self):
2609 return self.host
mbligh36768f02008-02-22 18:28:33 +00002610
2611
jadmanski0afbb632008-06-06 21:10:57 +00002612 def queue_log_record(self, log_line):
2613 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002614 _drone_manager.write_lines_to_file(self.queue_log_path,
2615 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002616
2617
jadmanski0afbb632008-06-06 21:10:57 +00002618 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002619 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002620 row = [0, self.job.id, host_id]
2621 block = IneligibleHostQueue(row=row, new_record=True)
2622 block.save()
mblighe2586682008-02-29 22:45:46 +00002623
2624
jadmanski0afbb632008-06-06 21:10:57 +00002625 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002626 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002627 blocks = IneligibleHostQueue.fetch(
2628 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2629 for block in blocks:
2630 block.delete()
mblighe2586682008-02-29 22:45:46 +00002631
2632
showard2bab8f42008-11-12 18:15:22 +00002633 def set_execution_subdir(self, subdir=None):
2634 if subdir is None:
2635 assert self.get_host()
2636 subdir = self.get_host().hostname
2637 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002638
2639
showard6355f6b2008-12-05 18:52:13 +00002640 def _get_hostname(self):
2641 if self.host:
2642 return self.host.hostname
2643 return 'no host'
2644
2645
showard170873e2009-01-07 00:22:26 +00002646 def __str__(self):
2647 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2648
2649
jadmanski0afbb632008-06-06 21:10:57 +00002650 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002651 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002652
showardb18134f2009-03-20 20:52:18 +00002653 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002654
showardc85c21b2008-11-24 22:17:37 +00002655 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002656 self.update_field('complete', False)
2657 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002658
jadmanski0afbb632008-06-06 21:10:57 +00002659 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002660 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002661 self.update_field('complete', False)
2662 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002663
showardc85c21b2008-11-24 22:17:37 +00002664 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002665 self.update_field('complete', True)
2666 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002667
2668 should_email_status = (status.lower() in _notify_email_statuses or
2669 'all' in _notify_email_statuses)
2670 if should_email_status:
2671 self._email_on_status(status)
2672
2673 self._email_on_job_complete()
2674
2675
2676 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002677 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002678
2679 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2680 self.job.id, self.job.name, hostname, status)
2681 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2682 self.job.id, self.job.name, hostname, status,
2683 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002684 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002685
2686
2687 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002688 if not self.job.is_finished():
2689 return
showard542e8402008-09-19 20:16:18 +00002690
showardc85c21b2008-11-24 22:17:37 +00002691 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002692 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002693 for queue_entry in hosts_queue:
2694 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002695 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002696 queue_entry.status))
2697
2698 summary_text = "\n".join(summary_text)
2699 status_counts = models.Job.objects.get_status_counts(
2700 [self.job.id])[self.job.id]
2701 status = ', '.join('%d %s' % (count, status) for status, count
2702 in status_counts.iteritems())
2703
2704 subject = 'Autotest: Job ID: %s "%s" %s' % (
2705 self.job.id, self.job.name, status)
2706 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2707 self.job.id, self.job.name, status, self._view_job_url(),
2708 summary_text)
showard170873e2009-01-07 00:22:26 +00002709 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002710
2711
showard77182562009-06-10 00:16:05 +00002712 def run_pre_job_tasks(self, assigned_host=None):
2713 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002714 assert assigned_host
2715 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002716 if self.host_id is None:
2717 self.set_host(assigned_host)
2718 else:
2719 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002720
showardcfd4a7e2009-07-11 01:47:33 +00002721 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002722 self.job.name, self.meta_host, self.atomic_group_id,
2723 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002724
showard77182562009-06-10 00:16:05 +00002725 return self._do_run_pre_job_tasks()
2726
2727
2728 def _do_run_pre_job_tasks(self):
2729 # Every host goes thru the Verifying stage (which may or may not
2730 # actually do anything as determined by get_pre_job_tasks).
2731 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2732
2733 # The pre job tasks always end with a SetEntryPendingTask which
2734 # will continue as appropriate through queue_entry.on_pending().
2735 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002736
showard6ae5ea92009-02-25 00:11:51 +00002737
jadmanski0afbb632008-06-06 21:10:57 +00002738 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002739 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002740 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002741 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002742 # verify/cleanup failure sets the execution subdir, so reset it here
2743 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002744 if self.meta_host:
2745 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002746
2747
jadmanski0afbb632008-06-06 21:10:57 +00002748 def handle_host_failure(self):
2749 """\
2750 Called when this queue entry's host has failed verification and
2751 repair.
2752 """
2753 assert not self.meta_host
2754 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002755 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002756
2757
jadmanskif7fa2cc2008-10-01 14:13:23 +00002758 @property
2759 def aborted_by(self):
2760 self._load_abort_info()
2761 return self._aborted_by
2762
2763
2764 @property
2765 def aborted_on(self):
2766 self._load_abort_info()
2767 return self._aborted_on
2768
2769
2770 def _load_abort_info(self):
2771 """ Fetch info about who aborted the job. """
2772 if hasattr(self, "_aborted_by"):
2773 return
2774 rows = _db.execute("""
2775 SELECT users.login, aborted_host_queue_entries.aborted_on
2776 FROM aborted_host_queue_entries
2777 INNER JOIN users
2778 ON users.id = aborted_host_queue_entries.aborted_by_id
2779 WHERE aborted_host_queue_entries.queue_entry_id = %s
2780 """, (self.id,))
2781 if rows:
2782 self._aborted_by, self._aborted_on = rows[0]
2783 else:
2784 self._aborted_by = self._aborted_on = None
2785
2786
showardb2e2c322008-10-14 17:33:55 +00002787 def on_pending(self):
2788 """
2789 Called when an entry in a synchronous job has passed verify. If the
2790 job is ready to run, returns an agent to run the job. Returns None
2791 otherwise.
2792 """
2793 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002794 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002795
2796 # Some debug code here: sends an email if an asynchronous job does not
2797 # immediately enter Starting.
2798 # TODO: Remove this once we figure out why asynchronous jobs are getting
2799 # stuck in Pending.
2800 agent = self.job.run_if_ready(queue_entry=self)
2801 if self.job.synch_count == 1 and agent is None:
2802 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2803 message = 'Asynchronous job stuck in Pending'
2804 email_manager.manager.enqueue_notify_email(subject, message)
2805 return agent
showardb2e2c322008-10-14 17:33:55 +00002806
2807
showardd3dc1992009-04-22 21:01:40 +00002808 def abort(self, dispatcher):
2809 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002810
showardd3dc1992009-04-22 21:01:40 +00002811 Status = models.HostQueueEntry.Status
2812 has_running_job_agent = (
2813 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2814 and dispatcher.get_agents_for_entry(self))
2815 if has_running_job_agent:
2816 # do nothing; post-job tasks will finish and then mark this entry
2817 # with status "Aborted" and take care of the host
2818 return
2819
2820 if self.status in (Status.STARTING, Status.PENDING):
2821 self.host.set_status(models.Host.Status.READY)
2822 elif self.status == Status.VERIFYING:
2823 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2824
2825 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002826
2827 def execution_tag(self):
2828 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002829 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002830
2831
showarded2afea2009-07-07 20:54:07 +00002832 def execution_path(self):
2833 return self.execution_tag()
2834
2835
mbligh36768f02008-02-22 18:28:33 +00002836class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002837 _table_name = 'jobs'
2838 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2839 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002840 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002841 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002842
showard77182562009-06-10 00:16:05 +00002843 # This does not need to be a column in the DB. The delays are likely to
2844 # be configured short. If the scheduler is stopped and restarted in
2845 # the middle of a job's delay cycle, the delay cycle will either be
2846 # repeated or skipped depending on the number of Pending machines found
2847 # when the restarted scheduler recovers to track it. Not a problem.
2848 #
2849 # A reference to the DelayedCallTask that will wake up the job should
2850 # no other HQEs change state in time. Its end_time attribute is used
2851 # by our run_with_ready_delay() method to determine if the wait is over.
2852 _delay_ready_task = None
2853
2854 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2855 # all status='Pending' atomic group HQEs incase a delay was running when the
2856 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002857
showarda3c58572009-03-12 20:36:59 +00002858 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002859 assert id or row
showarda3c58572009-03-12 20:36:59 +00002860 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002861
mblighe2586682008-02-29 22:45:46 +00002862
jadmanski0afbb632008-06-06 21:10:57 +00002863 def is_server_job(self):
2864 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002865
2866
showard170873e2009-01-07 00:22:26 +00002867 def tag(self):
2868 return "%s-%s" % (self.id, self.owner)
2869
2870
jadmanski0afbb632008-06-06 21:10:57 +00002871 def get_host_queue_entries(self):
2872 rows = _db.execute("""
2873 SELECT * FROM host_queue_entries
2874 WHERE job_id= %s
2875 """, (self.id,))
2876 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002877
jadmanski0afbb632008-06-06 21:10:57 +00002878 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002879
jadmanski0afbb632008-06-06 21:10:57 +00002880 return entries
mbligh36768f02008-02-22 18:28:33 +00002881
2882
jadmanski0afbb632008-06-06 21:10:57 +00002883 def set_status(self, status, update_queues=False):
2884 self.update_field('status',status)
2885
2886 if update_queues:
2887 for queue_entry in self.get_host_queue_entries():
2888 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002889
2890
showard77182562009-06-10 00:16:05 +00002891 def _atomic_and_has_started(self):
2892 """
2893 @returns True if any of the HostQueueEntries associated with this job
2894 have entered the Status.STARTING state or beyond.
2895 """
2896 atomic_entries = models.HostQueueEntry.objects.filter(
2897 job=self.id, atomic_group__isnull=False)
2898 if atomic_entries.count() <= 0:
2899 return False
2900
showardaf8b4ca2009-06-16 18:47:26 +00002901 # These states may *only* be reached if Job.run() has been called.
2902 started_statuses = (models.HostQueueEntry.Status.STARTING,
2903 models.HostQueueEntry.Status.RUNNING,
2904 models.HostQueueEntry.Status.COMPLETED)
2905
2906 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002907 return started_entries.count() > 0
2908
2909
showard708b3522009-08-20 23:26:15 +00002910 def _hosts_assigned_count(self):
2911 """The number of HostQueueEntries assigned a Host for this job."""
2912 entries = models.HostQueueEntry.objects.filter(job=self.id,
2913 host__isnull=False)
2914 return entries.count()
2915
2916
showard77182562009-06-10 00:16:05 +00002917 def _pending_count(self):
2918 """The number of HostQueueEntries for this job in the Pending state."""
2919 pending_entries = models.HostQueueEntry.objects.filter(
2920 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2921 return pending_entries.count()
2922
2923
jadmanski0afbb632008-06-06 21:10:57 +00002924 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002925 # NOTE: Atomic group jobs stop reporting ready after they have been
2926 # started to avoid launching multiple copies of one atomic job.
2927 # Only possible if synch_count is less than than half the number of
2928 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002929 pending_count = self._pending_count()
2930 atomic_and_has_started = self._atomic_and_has_started()
2931 ready = (pending_count >= self.synch_count
2932 and not self._atomic_and_has_started())
2933
2934 if not ready:
2935 logging.info(
2936 'Job %s not ready: %s pending, %s required '
2937 '(Atomic and started: %s)',
2938 self, pending_count, self.synch_count,
2939 atomic_and_has_started)
2940
2941 return ready
mbligh36768f02008-02-22 18:28:33 +00002942
2943
jadmanski0afbb632008-06-06 21:10:57 +00002944 def num_machines(self, clause = None):
2945 sql = "job_id=%s" % self.id
2946 if clause:
2947 sql += " AND (%s)" % clause
2948 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002949
2950
jadmanski0afbb632008-06-06 21:10:57 +00002951 def num_queued(self):
2952 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002953
2954
jadmanski0afbb632008-06-06 21:10:57 +00002955 def num_active(self):
2956 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002957
2958
jadmanski0afbb632008-06-06 21:10:57 +00002959 def num_complete(self):
2960 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002961
2962
jadmanski0afbb632008-06-06 21:10:57 +00002963 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002964 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002965
mbligh36768f02008-02-22 18:28:33 +00002966
showard6bb7c292009-01-30 01:44:51 +00002967 def _not_yet_run_entries(self, include_verifying=True):
2968 statuses = [models.HostQueueEntry.Status.QUEUED,
2969 models.HostQueueEntry.Status.PENDING]
2970 if include_verifying:
2971 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2972 return models.HostQueueEntry.objects.filter(job=self.id,
2973 status__in=statuses)
2974
2975
2976 def _stop_all_entries(self):
2977 entries_to_stop = self._not_yet_run_entries(
2978 include_verifying=False)
2979 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002980 assert not child_entry.complete, (
2981 '%s status=%s, active=%s, complete=%s' %
2982 (child_entry.id, child_entry.status, child_entry.active,
2983 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002984 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2985 child_entry.host.status = models.Host.Status.READY
2986 child_entry.host.save()
2987 child_entry.status = models.HostQueueEntry.Status.STOPPED
2988 child_entry.save()
2989
showard2bab8f42008-11-12 18:15:22 +00002990 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002991 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002992 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002993 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002994
2995
jadmanski0afbb632008-06-06 21:10:57 +00002996 def write_to_machines_file(self, queue_entry):
2997 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002998 file_path = os.path.join(self.tag(), '.machines')
2999 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00003000
3001
showardf1ae3542009-05-11 19:26:02 +00003002 def _next_group_name(self, group_name=''):
3003 """@returns a directory name to use for the next host group results."""
3004 if group_name:
3005 # Sanitize for use as a pathname.
3006 group_name = group_name.replace(os.path.sep, '_')
3007 if group_name.startswith('.'):
3008 group_name = '_' + group_name[1:]
3009 # Add a separator between the group name and 'group%d'.
3010 group_name += '.'
3011 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00003012 query = models.HostQueueEntry.objects.filter(
3013 job=self.id).values('execution_subdir').distinct()
3014 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00003015 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3016 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003017 if ids:
3018 next_id = max(ids) + 1
3019 else:
3020 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003021 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003022
3023
showard170873e2009-01-07 00:22:26 +00003024 def _write_control_file(self, execution_tag):
3025 control_path = _drone_manager.attach_file_to_execution(
3026 execution_tag, self.control_file)
3027 return control_path
mbligh36768f02008-02-22 18:28:33 +00003028
showardb2e2c322008-10-14 17:33:55 +00003029
showard2bab8f42008-11-12 18:15:22 +00003030 def get_group_entries(self, queue_entry_from_group):
3031 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003032 return list(HostQueueEntry.fetch(
3033 where='job_id=%s AND execution_subdir=%s',
3034 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003035
3036
showardb2e2c322008-10-14 17:33:55 +00003037 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003038 assert queue_entries
3039 execution_tag = queue_entries[0].execution_tag()
3040 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003041 hostnames = ','.join([entry.get_host().hostname
3042 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003043
showard87ba02a2009-04-20 19:37:32 +00003044 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003045 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003046 ['-P', execution_tag, '-n',
3047 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003048 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003049
jadmanski0afbb632008-06-06 21:10:57 +00003050 if not self.is_server_job():
3051 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003052
showardb2e2c322008-10-14 17:33:55 +00003053 return params
mblighe2586682008-02-29 22:45:46 +00003054
mbligh36768f02008-02-22 18:28:33 +00003055
showardc9ae1782009-01-30 01:42:37 +00003056 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003057 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003058 return True
showard0fc38302008-10-23 00:44:07 +00003059 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003060 return queue_entry.get_host().dirty
3061 return False
showard21baa452008-10-21 00:08:39 +00003062
showardc9ae1782009-01-30 01:42:37 +00003063
showard2fe3f1d2009-07-06 20:19:11 +00003064 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003065 do_not_verify = (queue_entry.host.protection ==
3066 host_protections.Protection.DO_NOT_VERIFY)
3067 if do_not_verify:
3068 return False
3069 return self.run_verify
3070
3071
showard77182562009-06-10 00:16:05 +00003072 def get_pre_job_tasks(self, queue_entry):
3073 """
3074 Get a list of tasks to perform before the host_queue_entry
3075 may be used to run this Job (such as Cleanup & Verify).
3076
3077 @returns A list of tasks to be done to the given queue_entry before
mbligh6fbdb802009-08-03 16:42:55 +00003078 it should be considered be ready to run this job. The last
showard77182562009-06-10 00:16:05 +00003079 task in the list calls HostQueueEntry.on_pending(), which
3080 continues the flow of the job.
3081 """
showard21baa452008-10-21 00:08:39 +00003082 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003083 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003084 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003085 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003086 tasks.append(VerifyTask(queue_entry=queue_entry))
3087 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003088 return tasks
3089
3090
showardf1ae3542009-05-11 19:26:02 +00003091 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003092 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003093 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003094 else:
showardf1ae3542009-05-11 19:26:02 +00003095 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003096 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003097 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003098 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003099
3100 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003101 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003102
3103
3104 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003105 """
3106 @returns A tuple containing a list of HostQueueEntry instances to be
3107 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003108 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003109 """
showard77182562009-06-10 00:16:05 +00003110 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003111 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003112 if atomic_group:
3113 num_entries_wanted = atomic_group.max_number_of_machines
3114 else:
3115 num_entries_wanted = self.synch_count
3116 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003117
showardf1ae3542009-05-11 19:26:02 +00003118 if num_entries_wanted > 0:
3119 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003120 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003121 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003122 params=(self.id, include_queue_entry.id)))
3123
3124 # Sort the chosen hosts by hostname before slicing.
3125 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3126 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3127 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3128 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003129
showardf1ae3542009-05-11 19:26:02 +00003130 # Sanity check. We'll only ever be called if this can be met.
3131 assert len(chosen_entries) >= self.synch_count
3132
3133 if atomic_group:
3134 # Look at any meta_host and dependency labels and pick the first
3135 # one that also specifies this atomic group. Use that label name
3136 # as the group name if possible (it is more specific).
3137 group_name = atomic_group.name
3138 for label in include_queue_entry.get_labels():
3139 if label.atomic_group_id:
3140 assert label.atomic_group_id == atomic_group.id
3141 group_name = label.name
3142 break
3143 else:
3144 group_name = ''
3145
3146 self._assign_new_group(chosen_entries, group_name=group_name)
3147 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003148
3149
showard77182562009-06-10 00:16:05 +00003150 def run_if_ready(self, queue_entry):
3151 """
3152 @returns An Agent instance to ultimately run this job if enough hosts
3153 are ready for it to run.
3154 @returns None and potentially cleans up excess hosts if this Job
3155 is not ready to run.
3156 """
showardb2e2c322008-10-14 17:33:55 +00003157 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003158 self.stop_if_necessary()
3159 return None
mbligh36768f02008-02-22 18:28:33 +00003160
showard77182562009-06-10 00:16:05 +00003161 if queue_entry.atomic_group:
3162 return self.run_with_ready_delay(queue_entry)
3163
3164 return self.run(queue_entry)
3165
3166
3167 def run_with_ready_delay(self, queue_entry):
3168 """
3169 Start a delay to wait for more hosts to enter Pending state before
3170 launching an atomic group job. Once set, the a delay cannot be reset.
3171
3172 @param queue_entry: The HostQueueEntry object to get atomic group
3173 info from and pass to run_if_ready when the delay is up.
3174
3175 @returns An Agent to run the job as appropriate or None if a delay
3176 has already been set.
3177 """
3178 assert queue_entry.job_id == self.id
3179 assert queue_entry.atomic_group
3180 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
showard708b3522009-08-20 23:26:15 +00003181 pending_threshold = min(self._hosts_assigned_count(),
3182 queue_entry.atomic_group.max_number_of_machines)
showard77182562009-06-10 00:16:05 +00003183 over_max_threshold = (self._pending_count() >= pending_threshold)
3184 delay_expired = (self._delay_ready_task and
3185 time.time() >= self._delay_ready_task.end_time)
3186
3187 # Delay is disabled or we already have enough? Do not wait to run.
3188 if not delay or over_max_threshold or delay_expired:
3189 return self.run(queue_entry)
3190
3191 # A delay was previously scheduled.
3192 if self._delay_ready_task:
3193 return None
3194
3195 def run_job_after_delay():
3196 logging.info('Job %s done waiting for extra hosts.', self.id)
3197 return self.run(queue_entry)
3198
showard708b3522009-08-20 23:26:15 +00003199 logging.info('Job %s waiting up to %s seconds for more hosts.',
3200 self.id, delay)
showard77182562009-06-10 00:16:05 +00003201 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3202 callback=run_job_after_delay)
3203
3204 return Agent([self._delay_ready_task], num_processes=0)
3205
3206
3207 def run(self, queue_entry):
3208 """
3209 @param queue_entry: The HostQueueEntry instance calling this method.
3210 @returns An Agent instance to run this job or None if we've already
3211 been run.
3212 """
3213 if queue_entry.atomic_group and self._atomic_and_has_started():
3214 logging.error('Job.run() called on running atomic Job %d '
3215 'with HQE %s.', self.id, queue_entry)
3216 return None
showardf1ae3542009-05-11 19:26:02 +00003217 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3218 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003219
3220
showardf1ae3542009-05-11 19:26:02 +00003221 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003222 for queue_entry in queue_entries:
3223 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003224 params = self._get_autoserv_params(queue_entries)
3225 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003226 cmd=params, group_name=group_name)
3227 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003228 if self._delay_ready_task:
3229 # Cancel any pending callback that would try to run again
3230 # as we are already running.
3231 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003232
showard170873e2009-01-07 00:22:26 +00003233 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003234
3235
showardb000a8d2009-07-28 20:02:07 +00003236 def __str__(self):
3237 return '%s-%s' % (self.id, self.owner)
3238
3239
mbligh36768f02008-02-22 18:28:33 +00003240if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003241 main()