blob: 537671fd934009701907aba73076b77898b2ee06 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showard136e6dc2009-06-10 19:38:49 +000010import itertools, logging, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard043c62a2009-06-10 19:48:57 +000013from autotest_lib.scheduler import scheduler_logging_config
showard21baa452008-10-21 00:08:39 +000014from autotest_lib.frontend import setup_django_environment
showard136e6dc2009-06-10 19:38:49 +000015from autotest_lib.client.common_lib import global_config, logging_manager
showardb18134f2009-03-20 20:52:18 +000016from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000017from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000018from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000019from autotest_lib.scheduler import drone_manager, drones, email_manager
showard043c62a2009-06-10 19:48:57 +000020from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000021from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000022
mblighb090f142008-02-27 21:33:46 +000023
mbligh36768f02008-02-22 18:28:33 +000024RESULTS_DIR = '.'
25AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000026DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000027AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
63
mbligh83c1e9e2009-05-01 23:10:41 +000064def _site_init_monitor_db_dummy():
65 return {}
66
67
mbligh36768f02008-02-22 18:28:33 +000068def main():
showard27f33872009-04-07 18:20:53 +000069 try:
70 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000071 except SystemExit:
72 raise
showard27f33872009-04-07 18:20:53 +000073 except:
74 logging.exception('Exception escaping in monitor_db')
75 raise
76
77
78def main_without_exception_handling():
showard136e6dc2009-06-10 19:38:49 +000079 setup_logging()
mbligh36768f02008-02-22 18:28:33 +000080
showard136e6dc2009-06-10 19:38:49 +000081 usage = 'usage: %prog [options] results_dir'
jadmanski0afbb632008-06-06 21:10:57 +000082 parser = optparse.OptionParser(usage)
83 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
84 action='store_true')
jadmanski0afbb632008-06-06 21:10:57 +000085 parser.add_option('--test', help='Indicate that scheduler is under ' +
86 'test and should use dummy autoserv and no parsing',
87 action='store_true')
88 (options, args) = parser.parse_args()
89 if len(args) != 1:
90 parser.print_usage()
91 return
mbligh36768f02008-02-22 18:28:33 +000092
showard5613c662009-06-08 23:30:33 +000093 scheduler_enabled = global_config.global_config.get_config_value(
94 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
95
96 if not scheduler_enabled:
97 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
98 "global_config's SCHEDULER section to enabled it. Exiting.")
99 print msg
100 sys.exit(1)
101
jadmanski0afbb632008-06-06 21:10:57 +0000102 global RESULTS_DIR
103 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh83c1e9e2009-05-01 23:10:41 +0000105 site_init = utils.import_site_function(__file__,
106 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
107 _site_init_monitor_db_dummy)
108 site_init()
109
showardcca334f2009-03-12 20:38:34 +0000110 # Change the cwd while running to avoid issues incase we were launched from
111 # somewhere odd (such as a random NFS home directory of the person running
112 # sudo to launch us as the appropriate user).
113 os.chdir(RESULTS_DIR)
114
jadmanski0afbb632008-06-06 21:10:57 +0000115 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000116 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
117 "notify_email_statuses",
118 default='')
showardc85c21b2008-11-24 22:17:37 +0000119 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000120 _notify_email_statuses = [status for status in
121 re.split(r'[\s,;:]', notify_statuses_list.lower())
122 if status]
showardc85c21b2008-11-24 22:17:37 +0000123
jadmanski0afbb632008-06-06 21:10:57 +0000124 if options.test:
125 global _autoserv_path
126 _autoserv_path = 'autoserv_dummy'
127 global _testing_mode
128 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000129
mbligh37eceaa2008-12-15 22:56:37 +0000130 # AUTOTEST_WEB.base_url is still a supported config option as some people
131 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000132 global _base_url
showard170873e2009-01-07 00:22:26 +0000133 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
134 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000135 if config_base_url:
136 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000137 else:
mbligh37eceaa2008-12-15 22:56:37 +0000138 # For the common case of everything running on a single server you
139 # can just set the hostname in a single place in the config file.
140 server_name = c.get_config_value('SERVER', 'hostname')
141 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000142 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000143 sys.exit(1)
144 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000145
showardc5afc462009-01-13 00:09:39 +0000146 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000147 server.start()
148
jadmanski0afbb632008-06-06 21:10:57 +0000149 try:
showard136e6dc2009-06-10 19:38:49 +0000150 init()
showardc5afc462009-01-13 00:09:39 +0000151 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000152 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000153
jadmanski0afbb632008-06-06 21:10:57 +0000154 while not _shutdown:
155 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000156 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000157 except:
showard170873e2009-01-07 00:22:26 +0000158 email_manager.manager.log_stacktrace(
159 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000160
showard170873e2009-01-07 00:22:26 +0000161 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000162 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000163 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000164 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000165
166
showard136e6dc2009-06-10 19:38:49 +0000167def setup_logging():
168 log_dir = os.environ.get('AUTOTEST_SCHEDULER_LOG_DIR', None)
169 log_name = os.environ.get('AUTOTEST_SCHEDULER_LOG_NAME', None)
170 logging_manager.configure_logging(
171 scheduler_logging_config.SchedulerLoggingConfig(), log_dir=log_dir,
172 logfile_name=log_name)
173
174
mbligh36768f02008-02-22 18:28:33 +0000175def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000176 global _shutdown
177 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000178 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000179
180
showard136e6dc2009-06-10 19:38:49 +0000181def init():
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
183 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000184
mblighfb676032009-04-01 18:25:38 +0000185 utils.write_pid("monitor_db")
186
showardb1e51872008-10-07 11:08:18 +0000187 if _testing_mode:
188 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000189 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000190
jadmanski0afbb632008-06-06 21:10:57 +0000191 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
192 global _db
showard170873e2009-01-07 00:22:26 +0000193 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000194 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000195
showardfa8629c2008-11-04 16:51:23 +0000196 # ensure Django connection is in autocommit
197 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000198 # bypass the readonly connection
199 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000200
showardb18134f2009-03-20 20:52:18 +0000201 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000202 signal.signal(signal.SIGINT, handle_sigint)
203
showardd1ee1dd2009-01-07 21:33:08 +0000204 drones = global_config.global_config.get_config_value(
205 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
206 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000207 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000208 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000209 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
210
showardb18134f2009-03-20 20:52:18 +0000211 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000212
213
showarded2afea2009-07-07 20:54:07 +0000214def _autoserv_command_line(machines, extra_args, job=None, queue_entry=None,
215 verbose=True):
showardf1ae3542009-05-11 19:26:02 +0000216 """
217 @returns The autoserv command line as a list of executable + parameters.
218
219 @param machines - string - A machine or comma separated list of machines
220 for the (-m) flag.
showardf1ae3542009-05-11 19:26:02 +0000221 @param extra_args - list - Additional arguments to pass to autoserv.
222 @param job - Job object - If supplied, -u owner and -l name parameters
223 will be added.
224 @param queue_entry - A HostQueueEntry object - If supplied and no Job
225 object was supplied, this will be used to lookup the Job object.
226 """
showard87ba02a2009-04-20 19:37:32 +0000227 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
showarded2afea2009-07-07 20:54:07 +0000228 '-r', drone_manager.WORKING_DIRECTORY]
showard87ba02a2009-04-20 19:37:32 +0000229 if job or queue_entry:
230 if not job:
231 job = queue_entry.job
232 autoserv_argv += ['-u', job.owner, '-l', job.name]
showarde9c69362009-06-30 01:58:03 +0000233 if verbose:
234 autoserv_argv.append('--verbose')
showard87ba02a2009-04-20 19:37:32 +0000235 return autoserv_argv + extra_args
236
237
showard89f84db2009-03-12 20:39:13 +0000238class SchedulerError(Exception):
239 """Raised by HostScheduler when an inconsistent state occurs."""
240
241
showard63a34772008-08-18 19:32:50 +0000242class HostScheduler(object):
243 def _get_ready_hosts(self):
244 # avoid any host with a currently active queue entry against it
245 hosts = Host.fetch(
246 joins='LEFT JOIN host_queue_entries AS active_hqe '
247 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000248 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000249 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000250 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000251 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
252 return dict((host.id, host) for host in hosts)
253
254
255 @staticmethod
256 def _get_sql_id_list(id_list):
257 return ','.join(str(item_id) for item_id in id_list)
258
259
260 @classmethod
showard989f25d2008-10-01 11:38:11 +0000261 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000262 if not id_list:
263 return {}
showard63a34772008-08-18 19:32:50 +0000264 query %= cls._get_sql_id_list(id_list)
265 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000266 return cls._process_many2many_dict(rows, flip)
267
268
269 @staticmethod
270 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000271 result = {}
272 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000273 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000274 if flip:
275 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000276 result.setdefault(left_id, set()).add(right_id)
277 return result
278
279
280 @classmethod
281 def _get_job_acl_groups(cls, job_ids):
282 query = """
showardd9ac4452009-02-07 02:04:37 +0000283 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000284 FROM jobs
285 INNER JOIN users ON users.login = jobs.owner
286 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
287 WHERE jobs.id IN (%s)
288 """
289 return cls._get_many2many_dict(query, job_ids)
290
291
292 @classmethod
293 def _get_job_ineligible_hosts(cls, job_ids):
294 query = """
295 SELECT job_id, host_id
296 FROM ineligible_host_queues
297 WHERE job_id IN (%s)
298 """
299 return cls._get_many2many_dict(query, job_ids)
300
301
302 @classmethod
showard989f25d2008-10-01 11:38:11 +0000303 def _get_job_dependencies(cls, job_ids):
304 query = """
305 SELECT job_id, label_id
306 FROM jobs_dependency_labels
307 WHERE job_id IN (%s)
308 """
309 return cls._get_many2many_dict(query, job_ids)
310
311
312 @classmethod
showard63a34772008-08-18 19:32:50 +0000313 def _get_host_acls(cls, host_ids):
314 query = """
showardd9ac4452009-02-07 02:04:37 +0000315 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000316 FROM acl_groups_hosts
317 WHERE host_id IN (%s)
318 """
319 return cls._get_many2many_dict(query, host_ids)
320
321
322 @classmethod
323 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000324 if not host_ids:
325 return {}, {}
showard63a34772008-08-18 19:32:50 +0000326 query = """
327 SELECT label_id, host_id
328 FROM hosts_labels
329 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000330 """ % cls._get_sql_id_list(host_ids)
331 rows = _db.execute(query)
332 labels_to_hosts = cls._process_many2many_dict(rows)
333 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
334 return labels_to_hosts, hosts_to_labels
335
336
337 @classmethod
338 def _get_labels(cls):
339 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000340
341
342 def refresh(self, pending_queue_entries):
343 self._hosts_available = self._get_ready_hosts()
344
345 relevant_jobs = [queue_entry.job_id
346 for queue_entry in pending_queue_entries]
347 self._job_acls = self._get_job_acl_groups(relevant_jobs)
348 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000349 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000350
351 host_ids = self._hosts_available.keys()
352 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000353 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
354
355 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000356
357
358 def _is_acl_accessible(self, host_id, queue_entry):
359 job_acls = self._job_acls.get(queue_entry.job_id, set())
360 host_acls = self._host_acls.get(host_id, set())
361 return len(host_acls.intersection(job_acls)) > 0
362
363
showard989f25d2008-10-01 11:38:11 +0000364 def _check_job_dependencies(self, job_dependencies, host_labels):
365 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000366 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000367
368
369 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
370 queue_entry):
showardade14e22009-01-26 22:38:32 +0000371 if not queue_entry.meta_host:
372 # bypass only_if_needed labels when a specific host is selected
373 return True
374
showard989f25d2008-10-01 11:38:11 +0000375 for label_id in host_labels:
376 label = self._labels[label_id]
377 if not label.only_if_needed:
378 # we don't care about non-only_if_needed labels
379 continue
380 if queue_entry.meta_host == label_id:
381 # if the label was requested in a metahost it's OK
382 continue
383 if label_id not in job_dependencies:
384 return False
385 return True
386
387
showard89f84db2009-03-12 20:39:13 +0000388 def _check_atomic_group_labels(self, host_labels, queue_entry):
389 """
390 Determine if the given HostQueueEntry's atomic group settings are okay
391 to schedule on a host with the given labels.
392
showard6157c632009-07-06 20:19:31 +0000393 @param host_labels: A list of label ids that the host has.
394 @param queue_entry: The HostQueueEntry being considered for the host.
showard89f84db2009-03-12 20:39:13 +0000395
396 @returns True if atomic group settings are okay, False otherwise.
397 """
showard6157c632009-07-06 20:19:31 +0000398 return (self._get_host_atomic_group_id(host_labels, queue_entry) ==
showard89f84db2009-03-12 20:39:13 +0000399 queue_entry.atomic_group_id)
400
401
showard6157c632009-07-06 20:19:31 +0000402 def _get_host_atomic_group_id(self, host_labels, queue_entry=None):
showard89f84db2009-03-12 20:39:13 +0000403 """
404 Return the atomic group label id for a host with the given set of
405 labels if any, or None otherwise. Raises an exception if more than
406 one atomic group are found in the set of labels.
407
showard6157c632009-07-06 20:19:31 +0000408 @param host_labels: A list of label ids that the host has.
409 @param queue_entry: The HostQueueEntry we're testing. Only used for
410 extra info in a potential logged error message.
showard89f84db2009-03-12 20:39:13 +0000411
412 @returns The id of the atomic group found on a label in host_labels
413 or None if no atomic group label is found.
showard89f84db2009-03-12 20:39:13 +0000414 """
showard6157c632009-07-06 20:19:31 +0000415 atomic_labels = [self._labels[label_id] for label_id in host_labels
416 if self._labels[label_id].atomic_group_id is not None]
417 atomic_ids = set(label.atomic_group_id for label in atomic_labels)
showard89f84db2009-03-12 20:39:13 +0000418 if not atomic_ids:
419 return None
420 if len(atomic_ids) > 1:
showard6157c632009-07-06 20:19:31 +0000421 logging.error('More than one Atomic Group on HQE "%s" via: %r',
422 queue_entry, atomic_labels)
423 return atomic_ids.pop()
showard89f84db2009-03-12 20:39:13 +0000424
425
426 def _get_atomic_group_labels(self, atomic_group_id):
427 """
428 Lookup the label ids that an atomic_group is associated with.
429
430 @param atomic_group_id - The id of the AtomicGroup to look up.
431
432 @returns A generator yeilding Label ids for this atomic group.
433 """
434 return (id for id, label in self._labels.iteritems()
435 if label.atomic_group_id == atomic_group_id
436 and not label.invalid)
437
438
showard54c1ea92009-05-20 00:32:58 +0000439 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000440 """
441 @param group_hosts - A sequence of Host ids to test for usability
442 and eligibility against the Job associated with queue_entry.
443 @param queue_entry - The HostQueueEntry that these hosts are being
444 tested for eligibility against.
445
446 @returns A subset of group_hosts Host ids that are eligible for the
447 supplied queue_entry.
448 """
449 return set(host_id for host_id in group_hosts
450 if self._is_host_usable(host_id)
451 and self._is_host_eligible_for_job(host_id, queue_entry))
452
453
showard989f25d2008-10-01 11:38:11 +0000454 def _is_host_eligible_for_job(self, host_id, queue_entry):
showard2924b0a2009-06-18 23:16:15 +0000455 if self._is_host_invalid(host_id):
456 # if an invalid host is scheduled for a job, it's a one-time host
457 # and it therefore bypasses eligibility checks. note this can only
458 # happen for non-metahosts, because invalid hosts have their label
459 # relationships cleared.
460 return True
461
showard989f25d2008-10-01 11:38:11 +0000462 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
463 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000464
showard89f84db2009-03-12 20:39:13 +0000465 return (self._is_acl_accessible(host_id, queue_entry) and
466 self._check_job_dependencies(job_dependencies, host_labels) and
467 self._check_only_if_needed_labels(
468 job_dependencies, host_labels, queue_entry) and
469 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000470
471
showard2924b0a2009-06-18 23:16:15 +0000472 def _is_host_invalid(self, host_id):
473 host_object = self._hosts_available.get(host_id, None)
474 return host_object and host_object.invalid
475
476
showard63a34772008-08-18 19:32:50 +0000477 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000478 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000479 return None
480 return self._hosts_available.pop(queue_entry.host_id, None)
481
482
483 def _is_host_usable(self, host_id):
484 if host_id not in self._hosts_available:
485 # host was already used during this scheduling cycle
486 return False
487 if self._hosts_available[host_id].invalid:
488 # Invalid hosts cannot be used for metahosts. They're included in
489 # the original query because they can be used by non-metahosts.
490 return False
491 return True
492
493
494 def _schedule_metahost(self, queue_entry):
495 label_id = queue_entry.meta_host
496 hosts_in_label = self._label_hosts.get(label_id, set())
497 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
498 set())
499
500 # must iterate over a copy so we can mutate the original while iterating
501 for host_id in list(hosts_in_label):
502 if not self._is_host_usable(host_id):
503 hosts_in_label.remove(host_id)
504 continue
505 if host_id in ineligible_host_ids:
506 continue
showard989f25d2008-10-01 11:38:11 +0000507 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000508 continue
509
showard89f84db2009-03-12 20:39:13 +0000510 # Remove the host from our cached internal state before returning
511 # the host object.
showard63a34772008-08-18 19:32:50 +0000512 hosts_in_label.remove(host_id)
513 return self._hosts_available.pop(host_id)
514 return None
515
516
517 def find_eligible_host(self, queue_entry):
518 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000519 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000520 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000521 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000522 return self._schedule_metahost(queue_entry)
523
524
showard89f84db2009-03-12 20:39:13 +0000525 def find_eligible_atomic_group(self, queue_entry):
526 """
527 Given an atomic group host queue entry, locate an appropriate group
528 of hosts for the associated job to run on.
529
530 The caller is responsible for creating new HQEs for the additional
531 hosts returned in order to run the actual job on them.
532
533 @returns A list of Host instances in a ready state to satisfy this
534 atomic group scheduling. Hosts will all belong to the same
535 atomic group label as specified by the queue_entry.
536 An empty list will be returned if no suitable atomic
537 group could be found.
538
539 TODO(gps): what is responsible for kicking off any attempted repairs on
540 a group of hosts? not this function, but something needs to. We do
541 not communicate that reason for returning [] outside of here...
542 For now, we'll just be unschedulable if enough hosts within one group
543 enter Repair Failed state.
544 """
545 assert queue_entry.atomic_group_id is not None
546 job = queue_entry.job
547 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000548 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000549 if job.synch_count > atomic_group.max_number_of_machines:
550 # Such a Job and HostQueueEntry should never be possible to
551 # create using the frontend. Regardless, we can't process it.
552 # Abort it immediately and log an error on the scheduler.
553 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000554 logging.error(
555 'Error: job %d synch_count=%d > requested atomic_group %d '
556 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
557 job.id, job.synch_count, atomic_group.id,
558 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000559 return []
560 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
561 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
562 set())
563
564 # Look in each label associated with atomic_group until we find one with
565 # enough hosts to satisfy the job.
566 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
567 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
568 if queue_entry.meta_host is not None:
569 # If we have a metahost label, only allow its hosts.
570 group_hosts.intersection_update(hosts_in_label)
571 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000572 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000573 group_hosts, queue_entry)
574
575 # Job.synch_count is treated as "minimum synch count" when
576 # scheduling for an atomic group of hosts. The atomic group
577 # number of machines is the maximum to pick out of a single
578 # atomic group label for scheduling at one time.
579 min_hosts = job.synch_count
580 max_hosts = atomic_group.max_number_of_machines
581
showard54c1ea92009-05-20 00:32:58 +0000582 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000583 # Not enough eligible hosts in this atomic group label.
584 continue
585
showard54c1ea92009-05-20 00:32:58 +0000586 eligible_hosts_in_group = [self._hosts_available[id]
587 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000588 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000589 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000590
showard89f84db2009-03-12 20:39:13 +0000591 # Limit ourselves to scheduling the atomic group size.
592 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000593 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000594
595 # Remove the selected hosts from our cached internal state
596 # of available hosts in order to return the Host objects.
597 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000598 for host in eligible_hosts_in_group:
599 hosts_in_label.discard(host.id)
600 self._hosts_available.pop(host.id)
601 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000602 return host_list
603
604 return []
605
606
showard170873e2009-01-07 00:22:26 +0000607class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000608 def __init__(self):
609 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000610 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000611 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000612 user_cleanup_time = scheduler_config.config.clean_interval
613 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
614 _db, user_cleanup_time)
615 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000616 self._host_agents = {}
617 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000618
mbligh36768f02008-02-22 18:28:33 +0000619
showard915958d2009-04-22 21:00:58 +0000620 def initialize(self, recover_hosts=True):
621 self._periodic_cleanup.initialize()
622 self._24hr_upkeep.initialize()
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 # always recover processes
625 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000626
jadmanski0afbb632008-06-06 21:10:57 +0000627 if recover_hosts:
628 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000629
630
jadmanski0afbb632008-06-06 21:10:57 +0000631 def tick(self):
showard170873e2009-01-07 00:22:26 +0000632 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000633 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000634 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000635 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000636 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000637 self._schedule_new_jobs()
638 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000639 _drone_manager.execute_actions()
640 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000641
showard97aed502008-11-04 02:01:24 +0000642
mblighf3294cc2009-04-08 21:17:38 +0000643 def _run_cleanup(self):
644 self._periodic_cleanup.run_cleanup_maybe()
645 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000646
mbligh36768f02008-02-22 18:28:33 +0000647
showard170873e2009-01-07 00:22:26 +0000648 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
649 for object_id in object_ids:
650 agent_dict.setdefault(object_id, set()).add(agent)
651
652
653 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
654 for object_id in object_ids:
655 assert object_id in agent_dict
656 agent_dict[object_id].remove(agent)
657
658
jadmanski0afbb632008-06-06 21:10:57 +0000659 def add_agent(self, agent):
660 self._agents.append(agent)
661 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000662 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
663 self._register_agent_for_ids(self._queue_entry_agents,
664 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def get_agents_for_entry(self, queue_entry):
668 """
669 Find agents corresponding to the specified queue_entry.
670 """
showardd3dc1992009-04-22 21:01:40 +0000671 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000672
673
674 def host_has_agent(self, host):
675 """
676 Determine if there is currently an Agent present using this host.
677 """
678 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000679
680
jadmanski0afbb632008-06-06 21:10:57 +0000681 def remove_agent(self, agent):
682 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000683 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
684 agent)
685 self._unregister_agent_for_ids(self._queue_entry_agents,
686 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000690 self._register_pidfiles()
691 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000692 self._recover_all_recoverable_entries()
showard6af73ad2009-07-28 20:00:58 +0000693 self._requeue_starting_entries()
showard6878e8b2009-07-20 22:37:45 +0000694 self._check_for_remaining_active_entries()
showard170873e2009-01-07 00:22:26 +0000695 self._reverify_remaining_hosts()
696 # reinitialize drones after killing orphaned processes, since they can
697 # leave around files when they die
698 _drone_manager.execute_actions()
699 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000700
showard170873e2009-01-07 00:22:26 +0000701
702 def _register_pidfiles(self):
703 # during recovery we may need to read pidfiles for both running and
704 # parsing entries
705 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000706 where="status IN ('Running', 'Gathering', 'Parsing')")
showarded2afea2009-07-07 20:54:07 +0000707 special_tasks = models.SpecialTask.objects.filter(is_active=True)
708 for execution_entry in itertools.chain(queue_entries, special_tasks):
showardd3dc1992009-04-22 21:01:40 +0000709 for pidfile_name in _ALL_PIDFILE_NAMES:
710 pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +0000711 execution_entry.execution_path(), pidfile_name=pidfile_name)
showardd3dc1992009-04-22 21:01:40 +0000712 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000713
714
showarded2afea2009-07-07 20:54:07 +0000715 def _get_recovery_run_monitor(self, execution_path, pidfile_name, orphans):
716 run_monitor = PidfileRunMonitor()
717 run_monitor.attach_to_existing_process(execution_path,
718 pidfile_name=pidfile_name)
719 if run_monitor.has_process():
720 orphans.discard(run_monitor.get_process())
721 return run_monitor, '(process %s)' % run_monitor.get_process()
722 return None, 'without process'
723
724
showardd3dc1992009-04-22 21:01:40 +0000725 def _recover_entries_with_status(self, status, orphans, pidfile_name,
726 recover_entries_fn):
727 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000728 for queue_entry in queue_entries:
729 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000730 # synchronous job we've already recovered
731 continue
showardd3dc1992009-04-22 21:01:40 +0000732 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showarded2afea2009-07-07 20:54:07 +0000733 run_monitor, process_string = self._get_recovery_run_monitor(
734 queue_entry.execution_path(), pidfile_name, orphans)
showard597bfd32009-05-08 18:22:50 +0000735
showarded2afea2009-07-07 20:54:07 +0000736 logging.info('Recovering %s entry %s %s',status.lower(),
737 ', '.join(str(entry) for entry in queue_entries),
738 process_string)
showardd3dc1992009-04-22 21:01:40 +0000739 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000740
741
showard6878e8b2009-07-20 22:37:45 +0000742 def _check_for_remaining_orphan_processes(self, orphans):
743 if not orphans:
744 return
745 subject = 'Unrecovered orphan autoserv processes remain'
746 message = '\n'.join(str(process) for process in orphans)
747 email_manager.manager.enqueue_notify_email(subject, message)
748 raise RuntimeError(subject + '\n' + message)
jadmanski0afbb632008-06-06 21:10:57 +0000749
showard170873e2009-01-07 00:22:26 +0000750
showardd3dc1992009-04-22 21:01:40 +0000751 def _recover_running_entries(self, orphans):
752 def recover_entries(job, queue_entries, run_monitor):
753 if run_monitor is not None:
showarded2afea2009-07-07 20:54:07 +0000754 queue_task = QueueTask(job=job, queue_entries=queue_entries,
755 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000756 self.add_agent(Agent(tasks=[queue_task],
757 num_processes=len(queue_entries)))
showard6878e8b2009-07-20 22:37:45 +0000758 else:
759 # we could do better, but this retains legacy behavior for now
760 for queue_entry in queue_entries:
761 logging.info('Requeuing running HQE %s since it has no '
762 'process' % queue_entry)
763 queue_entry.requeue()
showardd3dc1992009-04-22 21:01:40 +0000764
765 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
showarded2afea2009-07-07 20:54:07 +0000766 orphans, _AUTOSERV_PID_FILE,
showardd3dc1992009-04-22 21:01:40 +0000767 recover_entries)
768
769
770 def _recover_gathering_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 gather_task = GatherLogsTask(job, queue_entries,
showarded2afea2009-07-07 20:54:07 +0000773 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000774 self.add_agent(Agent([gather_task]))
775
776 self._recover_entries_with_status(
777 models.HostQueueEntry.Status.GATHERING,
778 orphans, _CRASHINFO_PID_FILE, recover_entries)
779
780
781 def _recover_parsing_entries(self, orphans):
782 def recover_entries(job, queue_entries, run_monitor):
783 reparse_task = FinalReparseTask(queue_entries,
showarded2afea2009-07-07 20:54:07 +0000784 recover_run_monitor=run_monitor)
showardd3dc1992009-04-22 21:01:40 +0000785 self.add_agent(Agent([reparse_task], num_processes=0))
786
787 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
788 orphans, _PARSER_PID_FILE,
789 recover_entries)
790
791
792 def _recover_all_recoverable_entries(self):
793 orphans = _drone_manager.get_orphaned_autoserv_processes()
794 self._recover_running_entries(orphans)
795 self._recover_gathering_entries(orphans)
796 self._recover_parsing_entries(orphans)
showarded2afea2009-07-07 20:54:07 +0000797 self._recover_special_tasks(orphans)
showard6878e8b2009-07-20 22:37:45 +0000798 self._check_for_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000799
showard97aed502008-11-04 02:01:24 +0000800
showarded2afea2009-07-07 20:54:07 +0000801 def _recover_special_tasks(self, orphans):
showard2fe3f1d2009-07-06 20:19:11 +0000802 """\
803 Recovers all special tasks that have started running but have not
804 completed.
805 """
806
807 tasks = models.SpecialTask.objects.filter(is_active=True,
808 is_complete=False)
809 # Use ordering to force NULL queue_entry_id's to the end of the list
810 for task in tasks.order_by('-queue_entry_id'):
showarded2afea2009-07-07 20:54:07 +0000811 assert not self.host_has_agent(task.host)
showard2fe3f1d2009-07-06 20:19:11 +0000812
813 host = Host(id=task.host.id)
814 queue_entry = None
815 if task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000816 queue_entry = HostQueueEntry(id=task.queue_entry.id)
showard2fe3f1d2009-07-06 20:19:11 +0000817
showarded2afea2009-07-07 20:54:07 +0000818 run_monitor, process_string = self._get_recovery_run_monitor(
819 task.execution_path(), _AUTOSERV_PID_FILE, orphans)
820
821 logging.info('Recovering %s %s', task, process_string)
822 self._recover_special_task(task, host, queue_entry, run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000823
824
showarded2afea2009-07-07 20:54:07 +0000825 def _recover_special_task(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000826 """\
827 Recovers a single special task.
828 """
829 if task.task == models.SpecialTask.Task.VERIFY:
showarded2afea2009-07-07 20:54:07 +0000830 agent_tasks = self._recover_verify(task, host, queue_entry,
831 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000832 elif task.task == models.SpecialTask.Task.REPAIR:
showarded2afea2009-07-07 20:54:07 +0000833 agent_tasks = self._recover_repair(task, host, queue_entry,
834 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000835 elif task.task == models.SpecialTask.Task.CLEANUP:
showarded2afea2009-07-07 20:54:07 +0000836 agent_tasks = self._recover_cleanup(task, host, queue_entry,
837 run_monitor)
showard2fe3f1d2009-07-06 20:19:11 +0000838 else:
839 # Should never happen
840 logging.error(
841 "Special task id %d had invalid task %s", (task.id, task.task))
842
843 self.add_agent(Agent(agent_tasks))
844
845
showarded2afea2009-07-07 20:54:07 +0000846 def _recover_verify(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000847 """\
848 Recovers a verify task.
849 No associated queue entry: Verify host
850 With associated queue entry: Verify host, and run associated queue
851 entry
852 """
853 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000854 return [VerifyTask(host=host, task=task,
855 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000856 else:
showarded2afea2009-07-07 20:54:07 +0000857 return [VerifyTask(queue_entry=queue_entry, task=task,
858 recover_run_monitor=run_monitor),
showard2fe3f1d2009-07-06 20:19:11 +0000859 SetEntryPendingTask(queue_entry=queue_entry)]
860
861
showarded2afea2009-07-07 20:54:07 +0000862 def _recover_repair(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000863 """\
864 Recovers a repair task.
865 Always repair host
866 """
showarded2afea2009-07-07 20:54:07 +0000867 return [RepairTask(host=host, queue_entry=queue_entry, task=task,
868 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000869
870
showarded2afea2009-07-07 20:54:07 +0000871 def _recover_cleanup(self, task, host, queue_entry, run_monitor):
showard2fe3f1d2009-07-06 20:19:11 +0000872 """\
873 Recovers a cleanup task.
874 No associated queue entry: Clean host
875 With associated queue entry: Clean host, verify host if needed, and
876 run associated queue entry
877 """
878 if not task.queue_entry:
showarded2afea2009-07-07 20:54:07 +0000879 return [CleanupTask(host=host, task=task,
880 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000881 else:
882 agent_tasks = [CleanupTask(queue_entry=queue_entry,
showarded2afea2009-07-07 20:54:07 +0000883 task=task,
884 recover_run_monitor=run_monitor)]
showard2fe3f1d2009-07-06 20:19:11 +0000885 if queue_entry.job.should_run_verify(queue_entry):
886 agent_tasks.append(VerifyTask(queue_entry=queue_entry))
887 agent_tasks.append(
888 SetEntryPendingTask(queue_entry=queue_entry))
889 return agent_tasks
890
891
showard6af73ad2009-07-28 20:00:58 +0000892 def _requeue_starting_entries(self):
893 # temporary measure until we implement proper recovery of Starting HQEs
894 for entry in HostQueueEntry.fetch(where='status="Starting"'):
895 logging.info('Requeuing "Starting" queue entry %s' % entry)
896 assert not self.get_agents_for_entry(entry)
897 assert entry.host.status == models.Host.Status.PENDING
898 self._reverify_hosts_where('id = %s' % entry.host.id)
899 entry.requeue()
900
901
showard6878e8b2009-07-20 22:37:45 +0000902 def _check_for_remaining_active_entries(self):
showard170873e2009-01-07 00:22:26 +0000903 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000904 where='active AND NOT complete AND '
905 '(aborted OR status != "Pending")')
showardd3dc1992009-04-22 21:01:40 +0000906
showard2fe3f1d2009-07-06 20:19:11 +0000907 message = '\n'.join(str(entry) for entry in queue_entries
908 if not self.get_agents_for_entry(entry))
909 if message:
910 email_manager.manager.enqueue_notify_email(
911 'Unrecovered active host queue entries exist',
912 message)
showard170873e2009-01-07 00:22:26 +0000913
914
showard1ff7b2e2009-05-15 23:17:18 +0000915 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000916 tasks = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +0000917 task=models.SpecialTask.Task.VERIFY, is_active=False,
918 is_complete=False, queue_entry__isnull=True)
showard6d7b2ff2009-06-10 00:16:47 +0000919
showard2fe3f1d2009-07-06 20:19:11 +0000920 for task in tasks:
921 host = Host.fetch(where='id = %s', params=(task.host.id,)).next()
922 if host.locked or host.invalid or self.host_has_agent(host):
923 continue
showard6d7b2ff2009-06-10 00:16:47 +0000924
showard2fe3f1d2009-07-06 20:19:11 +0000925 logging.info('Force reverifying host %s', host.hostname)
926 self.add_agent(Agent([VerifyTask(host=host, task=task)]))
showard1ff7b2e2009-05-15 23:17:18 +0000927
928
showard170873e2009-01-07 00:22:26 +0000929 def _reverify_remaining_hosts(self):
showarded2afea2009-07-07 20:54:07 +0000930 # recover active hosts that have not yet been recovered, although this
showard170873e2009-01-07 00:22:26 +0000931 # should never happen
showarded2afea2009-07-07 20:54:07 +0000932 message = ('Recovering active host %s - this probably indicates a '
showard170873e2009-01-07 00:22:26 +0000933 'scheduler bug')
showarded2afea2009-07-07 20:54:07 +0000934 self._reverify_hosts_where(
935 "status IN ('Repairing', 'Verifying', 'Cleaning', 'Running')",
936 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000937
938
jadmanski0afbb632008-06-06 21:10:57 +0000939 def _reverify_hosts_where(self, where,
showard2fe3f1d2009-07-06 20:19:11 +0000940 print_message='Reverifying host %s'):
showard170873e2009-01-07 00:22:26 +0000941 full_where='locked = 0 AND invalid = 0 AND ' + where
942 for host in Host.fetch(where=full_where):
943 if self.host_has_agent(host):
944 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000945 continue
showard170873e2009-01-07 00:22:26 +0000946 if print_message:
showardb18134f2009-03-20 20:52:18 +0000947 logging.info(print_message, host.hostname)
showard2fe3f1d2009-07-06 20:19:11 +0000948 tasks = host.reverify_tasks()
showard170873e2009-01-07 00:22:26 +0000949 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _recover_hosts(self):
953 # recover "Repair Failed" hosts
954 message = 'Reverifying dead host %s'
955 self._reverify_hosts_where("status = 'Repair Failed'",
956 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000957
958
showard04c82c52008-05-29 19:38:12 +0000959
showardb95b1bd2008-08-15 18:11:04 +0000960 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000961 # prioritize by job priority, then non-metahost over metahost, then FIFO
962 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000963 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000964 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000965 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000966
967
showard89f84db2009-03-12 20:39:13 +0000968 def _refresh_pending_queue_entries(self):
969 """
970 Lookup the pending HostQueueEntries and call our HostScheduler
971 refresh() method given that list. Return the list.
972
973 @returns A list of pending HostQueueEntries sorted in priority order.
974 """
showard63a34772008-08-18 19:32:50 +0000975 queue_entries = self._get_pending_queue_entries()
976 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000977 return []
showardb95b1bd2008-08-15 18:11:04 +0000978
showard63a34772008-08-18 19:32:50 +0000979 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000980
showard89f84db2009-03-12 20:39:13 +0000981 return queue_entries
982
983
984 def _schedule_atomic_group(self, queue_entry):
985 """
986 Schedule the given queue_entry on an atomic group of hosts.
987
988 Returns immediately if there are insufficient available hosts.
989
990 Creates new HostQueueEntries based off of queue_entry for the
991 scheduled hosts and starts them all running.
992 """
993 # This is a virtual host queue entry representing an entire
994 # atomic group, find a group and schedule their hosts.
995 group_hosts = self._host_scheduler.find_eligible_atomic_group(
996 queue_entry)
997 if not group_hosts:
998 return
showardcbe6f942009-06-17 19:33:49 +0000999
1000 logging.info('Expanding atomic group entry %s with hosts %s',
1001 queue_entry,
1002 ', '.join(host.hostname for host in group_hosts))
showard89f84db2009-03-12 20:39:13 +00001003 # The first assigned host uses the original HostQueueEntry
1004 group_queue_entries = [queue_entry]
1005 for assigned_host in group_hosts[1:]:
1006 # Create a new HQE for every additional assigned_host.
1007 new_hqe = HostQueueEntry.clone(queue_entry)
1008 new_hqe.save()
1009 group_queue_entries.append(new_hqe)
1010 assert len(group_queue_entries) == len(group_hosts)
1011 for queue_entry, host in itertools.izip(group_queue_entries,
1012 group_hosts):
1013 self._run_queue_entry(queue_entry, host)
1014
1015
1016 def _schedule_new_jobs(self):
1017 queue_entries = self._refresh_pending_queue_entries()
1018 if not queue_entries:
1019 return
1020
showard63a34772008-08-18 19:32:50 +00001021 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +00001022 if (queue_entry.atomic_group_id is None or
1023 queue_entry.host_id is not None):
1024 assigned_host = self._host_scheduler.find_eligible_host(
1025 queue_entry)
1026 if assigned_host:
1027 self._run_queue_entry(queue_entry, assigned_host)
1028 else:
1029 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +00001030
1031
1032 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +00001033 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
1034 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +00001035
1036
jadmanski0afbb632008-06-06 21:10:57 +00001037 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +00001038 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
1039 for agent in self.get_agents_for_entry(entry):
1040 agent.abort()
1041 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +00001042
1043
showard324bf812009-01-20 23:23:38 +00001044 def _can_start_agent(self, agent, num_started_this_cycle,
1045 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +00001046 # always allow zero-process agents to run
1047 if agent.num_processes == 0:
1048 return True
1049 # don't allow any nonzero-process agents to run after we've reached a
1050 # limit (this avoids starvation of many-process agents)
1051 if have_reached_limit:
1052 return False
1053 # total process throttling
showard324bf812009-01-20 23:23:38 +00001054 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +00001055 return False
1056 # if a single agent exceeds the per-cycle throttling, still allow it to
1057 # run when it's the first agent in the cycle
1058 if num_started_this_cycle == 0:
1059 return True
1060 # per-cycle throttling
1061 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +00001062 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +00001063 return False
1064 return True
1065
1066
jadmanski0afbb632008-06-06 21:10:57 +00001067 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +00001068 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +00001069 have_reached_limit = False
1070 # iterate over copy, so we can remove agents during iteration
1071 for agent in list(self._agents):
1072 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +00001073 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +00001074 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +00001075 continue
1076 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +00001077 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +00001078 have_reached_limit):
1079 have_reached_limit = True
1080 continue
showard4c5374f2008-09-04 17:02:56 +00001081 num_started_this_cycle += agent.num_processes
1082 agent.tick()
showarda9435c02009-05-13 21:28:17 +00001083 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +00001084 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001085
1086
showard29f7cd22009-04-29 21:16:24 +00001087 def _process_recurring_runs(self):
1088 recurring_runs = models.RecurringRun.objects.filter(
1089 start_date__lte=datetime.datetime.now())
1090 for rrun in recurring_runs:
1091 # Create job from template
1092 job = rrun.job
1093 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001094 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001095
1096 host_objects = info['hosts']
1097 one_time_hosts = info['one_time_hosts']
1098 metahost_objects = info['meta_hosts']
1099 dependencies = info['dependencies']
1100 atomic_group = info['atomic_group']
1101
1102 for host in one_time_hosts or []:
1103 this_host = models.Host.create_one_time_host(host.hostname)
1104 host_objects.append(this_host)
1105
1106 try:
1107 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001108 options=options,
showard29f7cd22009-04-29 21:16:24 +00001109 host_objects=host_objects,
1110 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001111 atomic_group=atomic_group)
1112
1113 except Exception, ex:
1114 logging.exception(ex)
1115 #TODO send email
1116
1117 if rrun.loop_count == 1:
1118 rrun.delete()
1119 else:
1120 if rrun.loop_count != 0: # if not infinite loop
1121 # calculate new start_date
1122 difference = datetime.timedelta(seconds=rrun.loop_period)
1123 rrun.start_date = rrun.start_date + difference
1124 rrun.loop_count -= 1
1125 rrun.save()
1126
1127
showard170873e2009-01-07 00:22:26 +00001128class PidfileRunMonitor(object):
1129 """
1130 Client must call either run() to start a new process or
1131 attach_to_existing_process().
1132 """
mbligh36768f02008-02-22 18:28:33 +00001133
showard170873e2009-01-07 00:22:26 +00001134 class _PidfileException(Exception):
1135 """
1136 Raised when there's some unexpected behavior with the pid file, but only
1137 used internally (never allowed to escape this class).
1138 """
mbligh36768f02008-02-22 18:28:33 +00001139
1140
showard170873e2009-01-07 00:22:26 +00001141 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001142 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001143 self._start_time = None
1144 self.pidfile_id = None
1145 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001146
1147
showard170873e2009-01-07 00:22:26 +00001148 def _add_nice_command(self, command, nice_level):
1149 if not nice_level:
1150 return command
1151 return ['nice', '-n', str(nice_level)] + command
1152
1153
1154 def _set_start_time(self):
1155 self._start_time = time.time()
1156
1157
1158 def run(self, command, working_directory, nice_level=None, log_file=None,
1159 pidfile_name=None, paired_with_pidfile=None):
1160 assert command is not None
1161 if nice_level is not None:
1162 command = ['nice', '-n', str(nice_level)] + command
1163 self._set_start_time()
1164 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001165 command, working_directory, pidfile_name=pidfile_name,
1166 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001167
1168
showarded2afea2009-07-07 20:54:07 +00001169 def attach_to_existing_process(self, execution_path,
showardd3dc1992009-04-22 21:01:40 +00001170 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001171 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001172 self.pidfile_id = _drone_manager.get_pidfile_id_from(
showarded2afea2009-07-07 20:54:07 +00001173 execution_path, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001174 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001175
1176
jadmanski0afbb632008-06-06 21:10:57 +00001177 def kill(self):
showard170873e2009-01-07 00:22:26 +00001178 if self.has_process():
1179 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001180
mbligh36768f02008-02-22 18:28:33 +00001181
showard170873e2009-01-07 00:22:26 +00001182 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001183 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001184 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001185
1186
showard170873e2009-01-07 00:22:26 +00001187 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001188 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001189 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001190 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001191
1192
showard170873e2009-01-07 00:22:26 +00001193 def _read_pidfile(self, use_second_read=False):
1194 assert self.pidfile_id is not None, (
1195 'You must call run() or attach_to_existing_process()')
1196 contents = _drone_manager.get_pidfile_contents(
1197 self.pidfile_id, use_second_read=use_second_read)
1198 if contents.is_invalid():
1199 self._state = drone_manager.PidfileContents()
1200 raise self._PidfileException(contents)
1201 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001202
1203
showard21baa452008-10-21 00:08:39 +00001204 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001205 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1206 self._state.process, self.pidfile_id, message)
showard170873e2009-01-07 00:22:26 +00001207 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001208 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001209
1210
1211 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001212 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001213 return
mblighbb421852008-03-11 22:36:16 +00001214
showard21baa452008-10-21 00:08:39 +00001215 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001216
showard170873e2009-01-07 00:22:26 +00001217 if self._state.process is None:
1218 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001219 return
mbligh90a549d2008-03-25 23:52:34 +00001220
showard21baa452008-10-21 00:08:39 +00001221 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001222 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001223 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001224 return
mbligh90a549d2008-03-25 23:52:34 +00001225
showard170873e2009-01-07 00:22:26 +00001226 # pid but no running process - maybe process *just* exited
1227 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001228 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001229 # autoserv exited without writing an exit code
1230 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001231 self._handle_pidfile_error(
1232 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001233
showard21baa452008-10-21 00:08:39 +00001234
1235 def _get_pidfile_info(self):
1236 """\
1237 After completion, self._state will contain:
1238 pid=None, exit_status=None if autoserv has not yet run
1239 pid!=None, exit_status=None if autoserv is running
1240 pid!=None, exit_status!=None if autoserv has completed
1241 """
1242 try:
1243 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001244 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001245 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001246
1247
showard170873e2009-01-07 00:22:26 +00001248 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001249 """\
1250 Called when no pidfile is found or no pid is in the pidfile.
1251 """
showard170873e2009-01-07 00:22:26 +00001252 message = 'No pid found at %s' % self.pidfile_id
showard170873e2009-01-07 00:22:26 +00001253 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1254 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001255 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001256 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001257
1258
showard35162b02009-03-03 02:17:30 +00001259 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001260 """\
1261 Called when autoserv has exited without writing an exit status,
1262 or we've timed out waiting for autoserv to write a pid to the
1263 pidfile. In either case, we just return failure and the caller
1264 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001265
showard170873e2009-01-07 00:22:26 +00001266 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001267 """
1268 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001269 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001270 self._state.exit_status = 1
1271 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001272
1273
jadmanski0afbb632008-06-06 21:10:57 +00001274 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001275 self._get_pidfile_info()
1276 return self._state.exit_status
1277
1278
1279 def num_tests_failed(self):
1280 self._get_pidfile_info()
1281 assert self._state.num_tests_failed is not None
1282 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001283
1284
mbligh36768f02008-02-22 18:28:33 +00001285class Agent(object):
showard77182562009-06-10 00:16:05 +00001286 """
1287 An agent for use by the Dispatcher class to perform a sequence of tasks.
1288
1289 The following methods are required on all task objects:
1290 poll() - Called periodically to let the task check its status and
1291 update its internal state. If the task succeeded.
1292 is_done() - Returns True if the task is finished.
1293 abort() - Called when an abort has been requested. The task must
1294 set its aborted attribute to True if it actually aborted.
1295
1296 The following attributes are required on all task objects:
1297 aborted - bool, True if this task was aborted.
1298 failure_tasks - A sequence of tasks to be run using a new Agent
1299 by the dispatcher should this task fail.
1300 success - bool, True if this task succeeded.
1301 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1302 host_ids - A sequence of Host ids this task represents.
1303
1304 The following attribute is written to all task objects:
1305 agent - A reference to the Agent instance that the task has been
1306 added to.
1307 """
1308
1309
showard170873e2009-01-07 00:22:26 +00001310 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001311 """
1312 @param tasks: A list of tasks as described in the class docstring.
1313 @param num_processes: The number of subprocesses the Agent represents.
1314 This is used by the Dispatcher for managing the load on the
1315 system. Defaults to 1.
1316 """
jadmanski0afbb632008-06-06 21:10:57 +00001317 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001318 self.queue = None
showard77182562009-06-10 00:16:05 +00001319 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001320 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001321 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001322
showard170873e2009-01-07 00:22:26 +00001323 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1324 for task in tasks)
1325 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1326
showardd3dc1992009-04-22 21:01:40 +00001327 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001328 for task in tasks:
1329 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001330
1331
showardd3dc1992009-04-22 21:01:40 +00001332 def _clear_queue(self):
1333 self.queue = Queue.Queue(0)
1334
1335
showard170873e2009-01-07 00:22:26 +00001336 def _union_ids(self, id_lists):
1337 return set(itertools.chain(*id_lists))
1338
1339
jadmanski0afbb632008-06-06 21:10:57 +00001340 def add_task(self, task):
1341 self.queue.put_nowait(task)
1342 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001343
1344
jadmanski0afbb632008-06-06 21:10:57 +00001345 def tick(self):
showard21baa452008-10-21 00:08:39 +00001346 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001347 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001348 self.active_task.poll()
1349 if not self.active_task.is_done():
1350 return
1351 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001352
1353
jadmanski0afbb632008-06-06 21:10:57 +00001354 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001355 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001356 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001357 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001358 if not self.active_task.success:
1359 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001360 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001361
jadmanski0afbb632008-06-06 21:10:57 +00001362 if not self.is_done():
1363 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001364
1365
jadmanski0afbb632008-06-06 21:10:57 +00001366 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001367 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001368 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1369 # get reset.
1370 new_agent = Agent(self.active_task.failure_tasks)
1371 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001372
mblighe2586682008-02-29 22:45:46 +00001373
showard4c5374f2008-09-04 17:02:56 +00001374 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001375 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001376
1377
jadmanski0afbb632008-06-06 21:10:57 +00001378 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001379 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001380
1381
showardd3dc1992009-04-22 21:01:40 +00001382 def abort(self):
showard08a36412009-05-05 01:01:13 +00001383 # abort tasks until the queue is empty or a task ignores the abort
1384 while not self.is_done():
1385 if not self.active_task:
1386 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001387 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001388 if not self.active_task.aborted:
1389 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001390 return
1391 self.active_task = None
1392
showardd3dc1992009-04-22 21:01:40 +00001393
showard77182562009-06-10 00:16:05 +00001394class DelayedCallTask(object):
1395 """
1396 A task object like AgentTask for an Agent to run that waits for the
1397 specified amount of time to have elapsed before calling the supplied
1398 callback once and finishing. If the callback returns anything, it is
1399 assumed to be a new Agent instance and will be added to the dispatcher.
1400
1401 @attribute end_time: The absolute posix time after which this task will
1402 call its callback when it is polled and be finished.
1403
1404 Also has all attributes required by the Agent class.
1405 """
1406 def __init__(self, delay_seconds, callback, now_func=None):
1407 """
1408 @param delay_seconds: The delay in seconds from now that this task
1409 will call the supplied callback and be done.
1410 @param callback: A callable to be called by this task once after at
1411 least delay_seconds time has elapsed. It must return None
1412 or a new Agent instance.
1413 @param now_func: A time.time like function. Default: time.time.
1414 Used for testing.
1415 """
1416 assert delay_seconds > 0
1417 assert callable(callback)
1418 if not now_func:
1419 now_func = time.time
1420 self._now_func = now_func
1421 self._callback = callback
1422
1423 self.end_time = self._now_func() + delay_seconds
1424
1425 # These attributes are required by Agent.
1426 self.aborted = False
1427 self.failure_tasks = ()
1428 self.host_ids = ()
1429 self.success = False
1430 self.queue_entry_ids = ()
1431 # This is filled in by Agent.add_task().
1432 self.agent = None
1433
1434
1435 def poll(self):
1436 if self._callback and self._now_func() >= self.end_time:
1437 new_agent = self._callback()
1438 if new_agent:
1439 self.agent.dispatcher.add_agent(new_agent)
1440 self._callback = None
1441 self.success = True
1442
1443
1444 def is_done(self):
1445 return not self._callback
1446
1447
1448 def abort(self):
1449 self.aborted = True
1450 self._callback = None
1451
1452
mbligh36768f02008-02-22 18:28:33 +00001453class AgentTask(object):
showarded2afea2009-07-07 20:54:07 +00001454 def __init__(self, cmd=None, working_directory=None, failure_tasks=[],
1455 pidfile_name=None, paired_with_pidfile=None,
1456 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001457 self.done = False
1458 self.failure_tasks = failure_tasks
jadmanski0afbb632008-06-06 21:10:57 +00001459 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001460 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001461 self.agent = None
showarded2afea2009-07-07 20:54:07 +00001462 self.monitor = recover_run_monitor
1463 self.started = bool(recover_run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +00001464 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001465 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001466 self.queue_entry_ids = []
1467 self.host_ids = []
1468 self.log_file = None
1469
1470
1471 def _set_ids(self, host=None, queue_entries=None):
1472 if queue_entries and queue_entries != [None]:
1473 self.host_ids = [entry.host.id for entry in queue_entries]
1474 self.queue_entry_ids = [entry.id for entry in queue_entries]
1475 else:
1476 assert host
1477 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001478
1479
jadmanski0afbb632008-06-06 21:10:57 +00001480 def poll(self):
showard08a36412009-05-05 01:01:13 +00001481 if not self.started:
1482 self.start()
1483 self.tick()
1484
1485
1486 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001487 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001488 exit_code = self.monitor.exit_code()
1489 if exit_code is None:
1490 return
1491 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001492 else:
1493 success = False
mbligh36768f02008-02-22 18:28:33 +00001494
jadmanski0afbb632008-06-06 21:10:57 +00001495 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001496
1497
jadmanski0afbb632008-06-06 21:10:57 +00001498 def is_done(self):
1499 return self.done
mbligh36768f02008-02-22 18:28:33 +00001500
1501
jadmanski0afbb632008-06-06 21:10:57 +00001502 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001503 if self.done:
1504 return
jadmanski0afbb632008-06-06 21:10:57 +00001505 self.done = True
1506 self.success = success
1507 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001508
1509
jadmanski0afbb632008-06-06 21:10:57 +00001510 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001511 assert not self.monitor
mblighd64e5702008-04-04 21:39:28 +00001512
mbligh36768f02008-02-22 18:28:33 +00001513
jadmanski0afbb632008-06-06 21:10:57 +00001514 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001515 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001516 _drone_manager.copy_to_results_repository(
1517 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001518
1519
jadmanski0afbb632008-06-06 21:10:57 +00001520 def epilog(self):
1521 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001522
1523
jadmanski0afbb632008-06-06 21:10:57 +00001524 def start(self):
1525 assert self.agent
1526
1527 if not self.started:
1528 self.prolog()
1529 self.run()
1530
1531 self.started = True
1532
1533
1534 def abort(self):
1535 if self.monitor:
1536 self.monitor.kill()
1537 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001538 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001539 self.cleanup()
1540
1541
showarded2afea2009-07-07 20:54:07 +00001542 def _get_consistent_execution_path(self, execution_entries):
1543 first_execution_path = execution_entries[0].execution_path()
1544 for execution_entry in execution_entries[1:]:
1545 assert execution_entry.execution_path() == first_execution_path, (
1546 '%s (%s) != %s (%s)' % (execution_entry.execution_path(),
1547 execution_entry,
1548 first_execution_path,
1549 execution_entries[0]))
1550 return first_execution_path
showard170873e2009-01-07 00:22:26 +00001551
1552
showarded2afea2009-07-07 20:54:07 +00001553 def _copy_results(self, execution_entries, use_monitor=None):
1554 """
1555 @param execution_entries: list of objects with execution_path() method
1556 """
1557 assert len(execution_entries) > 0
showard6b733412009-04-27 20:09:18 +00001558 if use_monitor is None:
1559 assert self.monitor
1560 use_monitor = self.monitor
1561 assert use_monitor.has_process()
showarded2afea2009-07-07 20:54:07 +00001562 execution_path = self._get_consistent_execution_path(execution_entries)
1563 results_path = execution_path + '/'
showard6b733412009-04-27 20:09:18 +00001564 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001565 results_path)
showardde634ee2009-01-30 01:44:24 +00001566
showarda1e74b32009-05-12 17:32:04 +00001567
1568 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001569 reparse_task = FinalReparseTask(queue_entries)
1570 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1571
1572
showarda1e74b32009-05-12 17:32:04 +00001573 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1574 self._copy_results(queue_entries, use_monitor)
1575 self._parse_results(queue_entries)
1576
1577
showardd3dc1992009-04-22 21:01:40 +00001578 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
showarded2afea2009-07-07 20:54:07 +00001579 assert not self.monitor
jadmanski0afbb632008-06-06 21:10:57 +00001580 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001581 self.monitor = PidfileRunMonitor()
1582 self.monitor.run(self.cmd, self._working_directory,
1583 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001584 log_file=self.log_file,
1585 pidfile_name=pidfile_name,
1586 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001587
1588
showardd9205182009-04-27 20:09:55 +00001589class TaskWithJobKeyvals(object):
1590 """AgentTask mixin providing functionality to help with job keyval files."""
1591 _KEYVAL_FILE = 'keyval'
1592 def _format_keyval(self, key, value):
1593 return '%s=%s' % (key, value)
1594
1595
1596 def _keyval_path(self):
1597 """Subclasses must override this"""
1598 raise NotImplemented
1599
1600
1601 def _write_keyval_after_job(self, field, value):
1602 assert self.monitor
1603 if not self.monitor.has_process():
1604 return
1605 _drone_manager.write_lines_to_file(
1606 self._keyval_path(), [self._format_keyval(field, value)],
1607 paired_with_process=self.monitor.get_process())
1608
1609
1610 def _job_queued_keyval(self, job):
1611 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1612
1613
1614 def _write_job_finished(self):
1615 self._write_keyval_after_job("job_finished", int(time.time()))
1616
1617
showarded2afea2009-07-07 20:54:07 +00001618class SpecialAgentTask(AgentTask):
1619 """
1620 Subclass for AgentTasks that correspond to a SpecialTask entry in the DB.
1621 """
1622
1623 TASK_TYPE = None
1624 host = None
1625 queue_entry = None
1626
1627 def __init__(self, task, extra_command_args, **kwargs):
1628 assert self.host
1629 assert (self.TASK_TYPE is not None,
1630 'self.TASK_TYPE must be overridden')
1631 self.task = task
showard8ac6f2a2009-07-16 14:50:32 +00001632 if task:
1633 kwargs['working_directory'] = task.execution_path()
showarded2afea2009-07-07 20:54:07 +00001634 self._extra_command_args = extra_command_args
1635 super(SpecialAgentTask, self).__init__(**kwargs)
1636
1637
1638 def prolog(self):
1639 super(SpecialAgentTask, self).prolog()
1640 self.task = models.SpecialTask.prepare(self, self.task)
1641 self.cmd = _autoserv_command_line(self.host.hostname,
1642 self._extra_command_args,
1643 queue_entry=self.queue_entry)
1644 self._working_directory = self.task.execution_path()
1645 self.task.activate()
1646
1647
showardb6681aa2009-07-08 21:15:00 +00001648 def cleanup(self):
1649 super(SpecialAgentTask, self).cleanup()
showarda640b2d2009-07-20 22:37:06 +00001650
1651 # self.task can be None if a SpecialAgentTask is aborted before the
1652 # prolog runs
1653 if self.task:
1654 self.task.finish()
1655
1656 if self.monitor and self.monitor.has_process() and self.task:
showarded2afea2009-07-07 20:54:07 +00001657 self._copy_results([self.task])
1658
1659
1660class RepairTask(SpecialAgentTask, TaskWithJobKeyvals):
1661 TASK_TYPE = models.SpecialTask.Task.REPAIR
1662
1663
1664 def __init__(self, host, queue_entry=None, task=None,
1665 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001666 """\
showard170873e2009-01-07 00:22:26 +00001667 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001668 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001669 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001670 # normalize the protection name
1671 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001672
jadmanski0afbb632008-06-06 21:10:57 +00001673 self.host = host
showardcfd4a7e2009-07-11 01:47:33 +00001674 self.queue_entry = None
1675 # recovery code can pass a HQE that's already been requeued. for a
1676 # metahost, that means the host has been unassigned. in that case,
1677 # ignore the HQE.
1678 hqe_still_assigned_to_this_host = (queue_entry and queue_entry.host
1679 and queue_entry.host.id == host.id)
1680 if hqe_still_assigned_to_this_host:
1681 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001682
showarded2afea2009-07-07 20:54:07 +00001683 super(RepairTask, self).__init__(
1684 task, ['-R', '--host-protection', protection],
1685 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001686
showard2fe3f1d2009-07-06 20:19:11 +00001687 # *don't* include the queue entry in IDs -- if the queue entry is
1688 # aborted, we want to leave the repair task running
1689 self._set_ids(host=host)
1690
mbligh36768f02008-02-22 18:28:33 +00001691
jadmanski0afbb632008-06-06 21:10:57 +00001692 def prolog(self):
showarded2afea2009-07-07 20:54:07 +00001693 super(RepairTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001694 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001695 self.host.set_status('Repairing')
showard2fe3f1d2009-07-06 20:19:11 +00001696 if self.queue_entry:
1697 self.queue_entry.requeue()
1698
mbligh36768f02008-02-22 18:28:33 +00001699
showardd9205182009-04-27 20:09:55 +00001700 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001701 return os.path.join(self._working_directory, self._KEYVAL_FILE)
showardd9205182009-04-27 20:09:55 +00001702
1703
showardde634ee2009-01-30 01:44:24 +00001704 def _fail_queue_entry(self):
showard2fe3f1d2009-07-06 20:19:11 +00001705 assert self.queue_entry
showardccbd6c52009-03-21 00:10:21 +00001706
showard2fe3f1d2009-07-06 20:19:11 +00001707 if self.queue_entry.meta_host:
showardccbd6c52009-03-21 00:10:21 +00001708 return # don't fail metahost entries, they'll be reassigned
1709
showard2fe3f1d2009-07-06 20:19:11 +00001710 self.queue_entry.update_from_database()
1711 if self.queue_entry.status != 'Queued':
showardccbd6c52009-03-21 00:10:21 +00001712 return # entry has been aborted
1713
showard2fe3f1d2009-07-06 20:19:11 +00001714 self.queue_entry.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001715 queued_key, queued_time = self._job_queued_keyval(
showard2fe3f1d2009-07-06 20:19:11 +00001716 self.queue_entry.job)
showardd9205182009-04-27 20:09:55 +00001717 self._write_keyval_after_job(queued_key, queued_time)
1718 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001719 # copy results logs into the normal place for job results
1720 _drone_manager.copy_results_on_drone(
1721 self.monitor.get_process(),
showarded2afea2009-07-07 20:54:07 +00001722 source_path=self._working_directory + '/',
1723 destination_path=self.queue_entry.execution_path() + '/')
showard678df4f2009-02-04 21:36:39 +00001724
showard2fe3f1d2009-07-06 20:19:11 +00001725 self._copy_results([self.queue_entry])
1726 if self.queue_entry.job.parse_failed_repair:
1727 self._parse_results([self.queue_entry])
1728 self.queue_entry.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001729
1730
jadmanski0afbb632008-06-06 21:10:57 +00001731 def epilog(self):
1732 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001733
jadmanski0afbb632008-06-06 21:10:57 +00001734 if self.success:
1735 self.host.set_status('Ready')
1736 else:
1737 self.host.set_status('Repair Failed')
showard2fe3f1d2009-07-06 20:19:11 +00001738 if self.queue_entry:
showardde634ee2009-01-30 01:44:24 +00001739 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001740
1741
showarded2afea2009-07-07 20:54:07 +00001742class PreJobTask(SpecialAgentTask):
showard170873e2009-01-07 00:22:26 +00001743 def epilog(self):
1744 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001745 should_copy_results = (self.queue_entry and not self.success
1746 and not self.queue_entry.meta_host)
1747 if should_copy_results:
1748 self.queue_entry.set_execution_subdir()
showarded2afea2009-07-07 20:54:07 +00001749 log_name = os.path.basename(self.task.execution_path())
1750 source = os.path.join(self.task.execution_path(), 'debug',
1751 'autoserv.DEBUG')
1752 destination = os.path.join(self.queue_entry.execution_path(),
1753 log_name)
showard170873e2009-01-07 00:22:26 +00001754 _drone_manager.copy_to_results_repository(
showarded2afea2009-07-07 20:54:07 +00001755 self.monitor.get_process(), source,
showard170873e2009-01-07 00:22:26 +00001756 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001757
1758
1759class VerifyTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00001760 TASK_TYPE = models.SpecialTask.Task.VERIFY
1761
1762
1763 def __init__(self, queue_entry=None, host=None, task=None,
1764 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001765 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001766 self.host = host or queue_entry.host
1767 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001768
showarde788ea62008-11-17 21:02:47 +00001769 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showarded2afea2009-07-07 20:54:07 +00001770 super(VerifyTask, self).__init__(
1771 task, ['-v'], failure_tasks=failure_tasks,
1772 recover_run_monitor=recover_run_monitor)
mblighe2586682008-02-29 22:45:46 +00001773
showard170873e2009-01-07 00:22:26 +00001774 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001775
1776
jadmanski0afbb632008-06-06 21:10:57 +00001777 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001778 super(VerifyTask, self).prolog()
showarded2afea2009-07-07 20:54:07 +00001779
showardb18134f2009-03-20 20:52:18 +00001780 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001781 if self.queue_entry:
1782 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001783 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001784
showarded2afea2009-07-07 20:54:07 +00001785 # Delete any other queued verifies for this host. One verify will do
1786 # and there's no need to keep records of other requests.
1787 queued_verifies = models.SpecialTask.objects.filter(
showard2fe3f1d2009-07-06 20:19:11 +00001788 host__id=self.host.id,
1789 task=models.SpecialTask.Task.VERIFY,
showarded2afea2009-07-07 20:54:07 +00001790 is_active=False, is_complete=False)
1791 queued_verifies = queued_verifies.exclude(id=self.task.id)
1792 queued_verifies.delete()
showard2fe3f1d2009-07-06 20:19:11 +00001793
mbligh36768f02008-02-22 18:28:33 +00001794
jadmanski0afbb632008-06-06 21:10:57 +00001795 def epilog(self):
1796 super(VerifyTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00001797 if self.success:
jadmanski0afbb632008-06-06 21:10:57 +00001798 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001799
1800
showardb5626452009-06-30 01:57:28 +00001801class CleanupHostsMixin(object):
1802 def _reboot_hosts(self, job, queue_entries, final_success,
1803 num_tests_failed):
1804 reboot_after = job.reboot_after
1805 do_reboot = (
1806 # always reboot after aborted jobs
1807 self._final_status == models.HostQueueEntry.Status.ABORTED
1808 or reboot_after == models.RebootAfter.ALWAYS
1809 or (reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED
1810 and final_success and num_tests_failed == 0))
1811
1812 for queue_entry in queue_entries:
1813 if do_reboot:
1814 # don't pass the queue entry to the CleanupTask. if the cleanup
1815 # fails, the job doesn't care -- it's over.
1816 cleanup_task = CleanupTask(host=queue_entry.host)
1817 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1818 else:
1819 queue_entry.host.set_status('Ready')
1820
1821
1822class QueueTask(AgentTask, TaskWithJobKeyvals, CleanupHostsMixin):
showarded2afea2009-07-07 20:54:07 +00001823 def __init__(self, job, queue_entries, cmd=None, group_name='',
1824 recover_run_monitor=None):
jadmanski0afbb632008-06-06 21:10:57 +00001825 self.job = job
1826 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001827 self.group_name = group_name
showarded2afea2009-07-07 20:54:07 +00001828 super(QueueTask, self).__init__(
1829 cmd, self._execution_path(),
1830 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00001831 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001832
1833
showard73ec0442009-02-07 02:05:20 +00001834 def _keyval_path(self):
showarded2afea2009-07-07 20:54:07 +00001835 return os.path.join(self._execution_path(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001836
1837
1838 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1839 keyval_contents = '\n'.join(self._format_keyval(key, value)
1840 for key, value in keyval_dict.iteritems())
1841 # always end with a newline to allow additional keyvals to be written
1842 keyval_contents += '\n'
showarded2afea2009-07-07 20:54:07 +00001843 _drone_manager.attach_file_to_execution(self._execution_path(),
showard73ec0442009-02-07 02:05:20 +00001844 keyval_contents,
1845 file_path=keyval_path)
1846
1847
1848 def _write_keyvals_before_job(self, keyval_dict):
1849 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1850
1851
showard170873e2009-01-07 00:22:26 +00001852 def _write_host_keyvals(self, host):
showarded2afea2009-07-07 20:54:07 +00001853 keyval_path = os.path.join(self._execution_path(), 'host_keyvals',
showard170873e2009-01-07 00:22:26 +00001854 host.hostname)
1855 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001856 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1857 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001858
1859
showarded2afea2009-07-07 20:54:07 +00001860 def _execution_path(self):
1861 return self.queue_entries[0].execution_path()
mblighbb421852008-03-11 22:36:16 +00001862
1863
jadmanski0afbb632008-06-06 21:10:57 +00001864 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001865 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001866 keyval_dict = {queued_key: queued_time}
1867 if self.group_name:
1868 keyval_dict['host_group_name'] = self.group_name
1869 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001870 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001871 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001872 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001873 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001874 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001875 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001876 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001877 assert len(self.queue_entries) == 1
1878 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001879
1880
showard35162b02009-03-03 02:17:30 +00001881 def _write_lost_process_error_file(self):
showarded2afea2009-07-07 20:54:07 +00001882 error_file_path = os.path.join(self._execution_path(), 'job_failure')
showard35162b02009-03-03 02:17:30 +00001883 _drone_manager.write_lines_to_file(error_file_path,
1884 [_LOST_PROCESS_ERROR])
1885
1886
showardd3dc1992009-04-22 21:01:40 +00001887 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001888 if not self.monitor:
1889 return
1890
showardd9205182009-04-27 20:09:55 +00001891 self._write_job_finished()
1892
showardd3dc1992009-04-22 21:01:40 +00001893 # both of these conditionals can be true, iff the process ran, wrote a
1894 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001895 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001896 gather_task = GatherLogsTask(self.job, self.queue_entries)
1897 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showardb5626452009-06-30 01:57:28 +00001898 else:
1899 self._reboot_hosts(self.job, self.queue_entries,
1900 final_success=False, num_tests_failed=0)
showard35162b02009-03-03 02:17:30 +00001901
1902 if self.monitor.lost_process:
1903 self._write_lost_process_error_file()
1904 for queue_entry in self.queue_entries:
1905 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001906
1907
showardcbd74612008-11-19 21:42:02 +00001908 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001909 _drone_manager.write_lines_to_file(
showarded2afea2009-07-07 20:54:07 +00001910 os.path.join(self._execution_path(), 'status.log'),
showard170873e2009-01-07 00:22:26 +00001911 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001912 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001913
1914
jadmanskif7fa2cc2008-10-01 14:13:23 +00001915 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001916 if not self.monitor or not self.monitor.has_process():
1917 return
1918
jadmanskif7fa2cc2008-10-01 14:13:23 +00001919 # build up sets of all the aborted_by and aborted_on values
1920 aborted_by, aborted_on = set(), set()
1921 for queue_entry in self.queue_entries:
1922 if queue_entry.aborted_by:
1923 aborted_by.add(queue_entry.aborted_by)
1924 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1925 aborted_on.add(t)
1926
1927 # extract some actual, unique aborted by value and write it out
1928 assert len(aborted_by) <= 1
1929 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001930 aborted_by_value = aborted_by.pop()
1931 aborted_on_value = max(aborted_on)
1932 else:
1933 aborted_by_value = 'autotest_system'
1934 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001935
showarda0382352009-02-11 23:36:43 +00001936 self._write_keyval_after_job("aborted_by", aborted_by_value)
1937 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001938
showardcbd74612008-11-19 21:42:02 +00001939 aborted_on_string = str(datetime.datetime.fromtimestamp(
1940 aborted_on_value))
1941 self._write_status_comment('Job aborted by %s on %s' %
1942 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001943
1944
jadmanski0afbb632008-06-06 21:10:57 +00001945 def abort(self):
1946 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001947 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001948 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001949
1950
jadmanski0afbb632008-06-06 21:10:57 +00001951 def epilog(self):
1952 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001953 self._finish_task()
1954 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001955
1956
showardd3dc1992009-04-22 21:01:40 +00001957class PostJobTask(AgentTask):
1958 def __init__(self, queue_entries, pidfile_name, logfile_name,
showarded2afea2009-07-07 20:54:07 +00001959 recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00001960 self._queue_entries = queue_entries
1961 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001962
showarded2afea2009-07-07 20:54:07 +00001963 self._execution_path = self._get_consistent_execution_path(
1964 queue_entries)
1965 self._results_dir = _drone_manager.absolute_path(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001966 self._autoserv_monitor = PidfileRunMonitor()
showarded2afea2009-07-07 20:54:07 +00001967 self._autoserv_monitor.attach_to_existing_process(self._execution_path)
showardd3dc1992009-04-22 21:01:40 +00001968 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1969
1970 if _testing_mode:
1971 command = 'true'
1972 else:
1973 command = self._generate_command(self._results_dir)
1974
showarded2afea2009-07-07 20:54:07 +00001975 super(PostJobTask, self).__init__(
1976 cmd=command, working_directory=self._execution_path,
1977 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00001978
showarded2afea2009-07-07 20:54:07 +00001979 self.log_file = os.path.join(self._execution_path, logfile_name)
showardd3dc1992009-04-22 21:01:40 +00001980 self._final_status = self._determine_final_status()
1981
1982
1983 def _generate_command(self, results_dir):
1984 raise NotImplementedError('Subclasses must override this')
1985
1986
1987 def _job_was_aborted(self):
1988 was_aborted = None
1989 for queue_entry in self._queue_entries:
1990 queue_entry.update_from_database()
1991 if was_aborted is None: # first queue entry
1992 was_aborted = bool(queue_entry.aborted)
1993 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1994 email_manager.manager.enqueue_notify_email(
1995 'Inconsistent abort state',
1996 'Queue entries have inconsistent abort state: ' +
1997 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1998 # don't crash here, just assume true
1999 return True
2000 return was_aborted
2001
2002
2003 def _determine_final_status(self):
2004 if self._job_was_aborted():
2005 return models.HostQueueEntry.Status.ABORTED
2006
2007 # we'll use a PidfileRunMonitor to read the autoserv exit status
2008 if self._autoserv_monitor.exit_code() == 0:
2009 return models.HostQueueEntry.Status.COMPLETED
2010 return models.HostQueueEntry.Status.FAILED
2011
2012
2013 def run(self):
showard5add1c82009-05-26 19:27:46 +00002014 # make sure we actually have results to work with.
2015 # this should never happen in normal operation.
2016 if not self._autoserv_monitor.has_process():
2017 email_manager.manager.enqueue_notify_email(
2018 'No results in post-job task',
2019 'No results in post-job task at %s' %
2020 self._autoserv_monitor.pidfile_id)
2021 self.finished(False)
2022 return
2023
2024 super(PostJobTask, self).run(
2025 pidfile_name=self._pidfile_name,
2026 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00002027
2028
2029 def _set_all_statuses(self, status):
2030 for queue_entry in self._queue_entries:
2031 queue_entry.set_status(status)
2032
2033
2034 def abort(self):
2035 # override AgentTask.abort() to avoid killing the process and ending
2036 # the task. post-job tasks continue when the job is aborted.
2037 pass
2038
2039
showardb5626452009-06-30 01:57:28 +00002040class GatherLogsTask(PostJobTask, CleanupHostsMixin):
showardd3dc1992009-04-22 21:01:40 +00002041 """
2042 Task responsible for
2043 * gathering uncollected logs (if Autoserv crashed hard or was killed)
2044 * copying logs to the results repository
2045 * spawning CleanupTasks for hosts, if necessary
2046 * spawning a FinalReparseTask for the job
2047 """
showarded2afea2009-07-07 20:54:07 +00002048 def __init__(self, job, queue_entries, recover_run_monitor=None):
showardd3dc1992009-04-22 21:01:40 +00002049 self._job = job
2050 super(GatherLogsTask, self).__init__(
2051 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
showarded2afea2009-07-07 20:54:07 +00002052 logfile_name='.collect_crashinfo.log',
2053 recover_run_monitor=recover_run_monitor)
showardd3dc1992009-04-22 21:01:40 +00002054 self._set_ids(queue_entries=queue_entries)
2055
2056
2057 def _generate_command(self, results_dir):
2058 host_list = ','.join(queue_entry.host.hostname
2059 for queue_entry in self._queue_entries)
2060 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
2061 '-r', results_dir]
2062
2063
2064 def prolog(self):
2065 super(GatherLogsTask, self).prolog()
2066 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
2067
2068
showardd3dc1992009-04-22 21:01:40 +00002069 def epilog(self):
2070 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00002071 if self._autoserv_monitor.has_process():
2072 self._copy_and_parse_results(self._queue_entries,
2073 use_monitor=self._autoserv_monitor)
showardb5626452009-06-30 01:57:28 +00002074
2075 final_success = (
2076 self._final_status == models.HostQueueEntry.Status.COMPLETED)
2077 num_tests_failed = self._autoserv_monitor.num_tests_failed()
2078 self._reboot_hosts(self._job, self._queue_entries, final_success,
2079 num_tests_failed)
showardd3dc1992009-04-22 21:01:40 +00002080
2081
showard0bbfc212009-04-29 21:06:13 +00002082 def run(self):
showard597bfd32009-05-08 18:22:50 +00002083 autoserv_exit_code = self._autoserv_monitor.exit_code()
2084 # only run if Autoserv exited due to some signal. if we have no exit
2085 # code, assume something bad (and signal-like) happened.
2086 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00002087 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00002088 else:
2089 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00002090
2091
showard8fe93b52008-11-18 17:53:22 +00002092class CleanupTask(PreJobTask):
showarded2afea2009-07-07 20:54:07 +00002093 TASK_TYPE = models.SpecialTask.Task.CLEANUP
2094
2095
2096 def __init__(self, host=None, queue_entry=None, task=None,
2097 recover_run_monitor=None):
showardfa8629c2008-11-04 16:51:23 +00002098 assert bool(host) ^ bool(queue_entry)
2099 if queue_entry:
2100 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00002101 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00002102 self.host = host
showard170873e2009-01-07 00:22:26 +00002103
showarde788ea62008-11-17 21:02:47 +00002104 repair_task = RepairTask(host, queue_entry=queue_entry)
showarded2afea2009-07-07 20:54:07 +00002105 super(CleanupTask, self).__init__(
2106 task, ['--cleanup'], failure_tasks=[repair_task],
2107 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002108
2109 self._set_ids(host=host, queue_entries=[queue_entry])
mbligh16c722d2008-03-05 00:58:44 +00002110
mblighd5c95802008-03-05 00:33:46 +00002111
jadmanski0afbb632008-06-06 21:10:57 +00002112 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00002113 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00002114 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00002115 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00002116
mblighd5c95802008-03-05 00:33:46 +00002117
showard21baa452008-10-21 00:08:39 +00002118 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002119 super(CleanupTask, self).epilog()
showard2fe3f1d2009-07-06 20:19:11 +00002120
showard21baa452008-10-21 00:08:39 +00002121 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002122 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002123 self.host.update_field('dirty', 0)
2124
2125
showardd3dc1992009-04-22 21:01:40 +00002126class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002127 _num_running_parses = 0
2128
showarded2afea2009-07-07 20:54:07 +00002129 def __init__(self, queue_entries, recover_run_monitor=None):
2130 super(FinalReparseTask, self).__init__(
2131 queue_entries, pidfile_name=_PARSER_PID_FILE,
2132 logfile_name='.parse.log',
2133 recover_run_monitor=recover_run_monitor)
showard170873e2009-01-07 00:22:26 +00002134 # don't use _set_ids, since we don't want to set the host_ids
2135 self.queue_entry_ids = [entry.id for entry in queue_entries]
showarded2afea2009-07-07 20:54:07 +00002136 self._parse_started = self.started
showard97aed502008-11-04 02:01:24 +00002137
showard97aed502008-11-04 02:01:24 +00002138
2139 @classmethod
2140 def _increment_running_parses(cls):
2141 cls._num_running_parses += 1
2142
2143
2144 @classmethod
2145 def _decrement_running_parses(cls):
2146 cls._num_running_parses -= 1
2147
2148
2149 @classmethod
2150 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002151 return (cls._num_running_parses <
2152 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002153
2154
2155 def prolog(self):
2156 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002157 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002158
2159
2160 def epilog(self):
2161 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002162 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002163
2164
showardd3dc1992009-04-22 21:01:40 +00002165 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002166 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002167 results_dir]
showard97aed502008-11-04 02:01:24 +00002168
2169
showard08a36412009-05-05 01:01:13 +00002170 def tick(self):
2171 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002172 # and we can, at which point we revert to default behavior
2173 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002174 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002175 else:
2176 self._try_starting_parse()
2177
2178
2179 def run(self):
2180 # override run() to not actually run unless we can
2181 self._try_starting_parse()
2182
2183
2184 def _try_starting_parse(self):
2185 if not self._can_run_new_parse():
2186 return
showard170873e2009-01-07 00:22:26 +00002187
showard97aed502008-11-04 02:01:24 +00002188 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002189 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002190
showard97aed502008-11-04 02:01:24 +00002191 self._increment_running_parses()
2192 self._parse_started = True
2193
2194
2195 def finished(self, success):
2196 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002197 if self._parse_started:
2198 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002199
2200
showardc9ae1782009-01-30 01:42:37 +00002201class SetEntryPendingTask(AgentTask):
2202 def __init__(self, queue_entry):
2203 super(SetEntryPendingTask, self).__init__(cmd='')
2204 self._queue_entry = queue_entry
2205 self._set_ids(queue_entries=[queue_entry])
2206
2207
2208 def run(self):
2209 agent = self._queue_entry.on_pending()
2210 if agent:
2211 self.agent.dispatcher.add_agent(agent)
2212 self.finished(True)
2213
2214
showarda3c58572009-03-12 20:36:59 +00002215class DBError(Exception):
2216 """Raised by the DBObject constructor when its select fails."""
2217
2218
mbligh36768f02008-02-22 18:28:33 +00002219class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002220 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002221
2222 # Subclasses MUST override these:
2223 _table_name = ''
2224 _fields = ()
2225
showarda3c58572009-03-12 20:36:59 +00002226 # A mapping from (type, id) to the instance of the object for that
2227 # particular id. This prevents us from creating new Job() and Host()
2228 # instances for every HostQueueEntry object that we instantiate as
2229 # multiple HQEs often share the same Job.
2230 _instances_by_type_and_id = weakref.WeakValueDictionary()
2231 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002232
showarda3c58572009-03-12 20:36:59 +00002233
2234 def __new__(cls, id=None, **kwargs):
2235 """
2236 Look to see if we already have an instance for this particular type
2237 and id. If so, use it instead of creating a duplicate instance.
2238 """
2239 if id is not None:
2240 instance = cls._instances_by_type_and_id.get((cls, id))
2241 if instance:
2242 return instance
2243 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2244
2245
2246 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002247 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002248 assert self._table_name, '_table_name must be defined in your class'
2249 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002250 if not new_record:
2251 if self._initialized and not always_query:
2252 return # We've already been initialized.
2253 if id is None:
2254 id = row[0]
2255 # Tell future constructors to use us instead of re-querying while
2256 # this instance is still around.
2257 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002258
showard6ae5ea92009-02-25 00:11:51 +00002259 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002260
jadmanski0afbb632008-06-06 21:10:57 +00002261 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002262
jadmanski0afbb632008-06-06 21:10:57 +00002263 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002264 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002265
showarda3c58572009-03-12 20:36:59 +00002266 if self._initialized:
2267 differences = self._compare_fields_in_row(row)
2268 if differences:
showard7629f142009-03-27 21:02:02 +00002269 logging.warn(
2270 'initialized %s %s instance requery is updating: %s',
2271 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002272 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002273 self._initialized = True
2274
2275
2276 @classmethod
2277 def _clear_instance_cache(cls):
2278 """Used for testing, clear the internal instance cache."""
2279 cls._instances_by_type_and_id.clear()
2280
2281
showardccbd6c52009-03-21 00:10:21 +00002282 def _fetch_row_from_db(self, row_id):
2283 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2284 rows = _db.execute(sql, (row_id,))
2285 if not rows:
showard76e29d12009-04-15 21:53:10 +00002286 raise DBError("row not found (table=%s, row id=%s)"
2287 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002288 return rows[0]
2289
2290
showarda3c58572009-03-12 20:36:59 +00002291 def _assert_row_length(self, row):
2292 assert len(row) == len(self._fields), (
2293 "table = %s, row = %s/%d, fields = %s/%d" % (
2294 self.__table, row, len(row), self._fields, len(self._fields)))
2295
2296
2297 def _compare_fields_in_row(self, row):
2298 """
2299 Given a row as returned by a SELECT query, compare it to our existing
2300 in memory fields.
2301
2302 @param row - A sequence of values corresponding to fields named in
2303 The class attribute _fields.
2304
2305 @returns A dictionary listing the differences keyed by field name
2306 containing tuples of (current_value, row_value).
2307 """
2308 self._assert_row_length(row)
2309 differences = {}
2310 for field, row_value in itertools.izip(self._fields, row):
2311 current_value = getattr(self, field)
2312 if current_value != row_value:
2313 differences[field] = (current_value, row_value)
2314 return differences
showard2bab8f42008-11-12 18:15:22 +00002315
2316
2317 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002318 """
2319 Update our field attributes using a single row returned by SELECT.
2320
2321 @param row - A sequence of values corresponding to fields named in
2322 the class fields list.
2323 """
2324 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002325
showard2bab8f42008-11-12 18:15:22 +00002326 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002327 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002328 setattr(self, field, value)
2329 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002330
showard2bab8f42008-11-12 18:15:22 +00002331 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002332
mblighe2586682008-02-29 22:45:46 +00002333
showardccbd6c52009-03-21 00:10:21 +00002334 def update_from_database(self):
2335 assert self.id is not None
2336 row = self._fetch_row_from_db(self.id)
2337 self._update_fields_from_row(row)
2338
2339
jadmanski0afbb632008-06-06 21:10:57 +00002340 def count(self, where, table = None):
2341 if not table:
2342 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002343
jadmanski0afbb632008-06-06 21:10:57 +00002344 rows = _db.execute("""
2345 SELECT count(*) FROM %s
2346 WHERE %s
2347 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002348
jadmanski0afbb632008-06-06 21:10:57 +00002349 assert len(rows) == 1
2350
2351 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002352
2353
showardd3dc1992009-04-22 21:01:40 +00002354 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002355 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002356
showard2bab8f42008-11-12 18:15:22 +00002357 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002358 return
mbligh36768f02008-02-22 18:28:33 +00002359
mblighf8c624d2008-07-03 16:58:45 +00002360 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002361 _db.execute(query, (value, self.id))
2362
showard2bab8f42008-11-12 18:15:22 +00002363 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002364
2365
jadmanski0afbb632008-06-06 21:10:57 +00002366 def save(self):
2367 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002368 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002369 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002370 values = []
2371 for key in keys:
2372 value = getattr(self, key)
2373 if value is None:
2374 values.append('NULL')
2375 else:
2376 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002377 values_str = ','.join(values)
2378 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2379 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002380 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002381 # Update our id to the one the database just assigned to us.
2382 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002383
2384
jadmanski0afbb632008-06-06 21:10:57 +00002385 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002386 self._instances_by_type_and_id.pop((type(self), id), None)
2387 self._initialized = False
2388 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002389 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2390 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002391
2392
showard63a34772008-08-18 19:32:50 +00002393 @staticmethod
2394 def _prefix_with(string, prefix):
2395 if string:
2396 string = prefix + string
2397 return string
2398
2399
jadmanski0afbb632008-06-06 21:10:57 +00002400 @classmethod
showard989f25d2008-10-01 11:38:11 +00002401 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002402 """
2403 Construct instances of our class based on the given database query.
2404
2405 @yields One class instance for each row fetched.
2406 """
showard63a34772008-08-18 19:32:50 +00002407 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2408 where = cls._prefix_with(where, 'WHERE ')
2409 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002410 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002411 'joins' : joins,
2412 'where' : where,
2413 'order_by' : order_by})
2414 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002415 for row in rows:
2416 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002417
mbligh36768f02008-02-22 18:28:33 +00002418
2419class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002420 _table_name = 'ineligible_host_queues'
2421 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002422
2423
showard89f84db2009-03-12 20:39:13 +00002424class AtomicGroup(DBObject):
2425 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002426 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2427 'invalid')
showard89f84db2009-03-12 20:39:13 +00002428
2429
showard989f25d2008-10-01 11:38:11 +00002430class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002431 _table_name = 'labels'
2432 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002433 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002434
2435
showard6157c632009-07-06 20:19:31 +00002436 def __repr__(self):
2437 return 'Label(name=%r, id=%d, atomic_group_id=%r)' % (
2438 self.name, self.id, self.atomic_group_id)
2439
2440
mbligh36768f02008-02-22 18:28:33 +00002441class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002442 _table_name = 'hosts'
2443 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2444 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2445
2446
jadmanski0afbb632008-06-06 21:10:57 +00002447 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002448 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002449 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002450
2451
showard170873e2009-01-07 00:22:26 +00002452 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002453 """
showard170873e2009-01-07 00:22:26 +00002454 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002455 """
2456 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002457 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002458 FROM labels
2459 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002460 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002461 ORDER BY labels.name
2462 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002463 platform = None
2464 all_labels = []
2465 for label_name, is_platform in rows:
2466 if is_platform:
2467 platform = label_name
2468 all_labels.append(label_name)
2469 return platform, all_labels
2470
2471
showard2fe3f1d2009-07-06 20:19:11 +00002472 def reverify_tasks(self):
2473 cleanup_task = CleanupTask(host=self)
2474 verify_task = VerifyTask(host=self)
2475
showard6d7b2ff2009-06-10 00:16:47 +00002476 # just to make sure this host does not get taken away
showard2fe3f1d2009-07-06 20:19:11 +00002477 self.set_status('Cleaning')
2478 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002479
2480
showard54c1ea92009-05-20 00:32:58 +00002481 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2482
2483
2484 @classmethod
2485 def cmp_for_sort(cls, a, b):
2486 """
2487 A comparison function for sorting Host objects by hostname.
2488
2489 This strips any trailing numeric digits, ignores leading 0s and
2490 compares hostnames by the leading name and the trailing digits as a
2491 number. If both hostnames do not match this pattern, they are simply
2492 compared as lower case strings.
2493
2494 Example of how hostnames will be sorted:
2495
2496 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2497
2498 This hopefully satisfy most people's hostname sorting needs regardless
2499 of their exact naming schemes. Nobody sane should have both a host10
2500 and host010 (but the algorithm works regardless).
2501 """
2502 lower_a = a.hostname.lower()
2503 lower_b = b.hostname.lower()
2504 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2505 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2506 if match_a and match_b:
2507 name_a, number_a_str = match_a.groups()
2508 name_b, number_b_str = match_b.groups()
2509 number_a = int(number_a_str.lstrip('0'))
2510 number_b = int(number_b_str.lstrip('0'))
2511 result = cmp((name_a, number_a), (name_b, number_b))
2512 if result == 0 and lower_a != lower_b:
2513 # If they compared equal above but the lower case names are
2514 # indeed different, don't report equality. abc012 != abc12.
2515 return cmp(lower_a, lower_b)
2516 return result
2517 else:
2518 return cmp(lower_a, lower_b)
2519
2520
mbligh36768f02008-02-22 18:28:33 +00002521class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002522 _table_name = 'host_queue_entries'
2523 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002524 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002525 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002526
2527
showarda3c58572009-03-12 20:36:59 +00002528 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002529 assert id or row
showarda3c58572009-03-12 20:36:59 +00002530 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002531 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002532
jadmanski0afbb632008-06-06 21:10:57 +00002533 if self.host_id:
2534 self.host = Host(self.host_id)
2535 else:
2536 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002537
showard77182562009-06-10 00:16:05 +00002538 if self.atomic_group_id:
2539 self.atomic_group = AtomicGroup(self.atomic_group_id,
2540 always_query=False)
2541 else:
2542 self.atomic_group = None
2543
showard170873e2009-01-07 00:22:26 +00002544 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002545 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002546
2547
showard89f84db2009-03-12 20:39:13 +00002548 @classmethod
2549 def clone(cls, template):
2550 """
2551 Creates a new row using the values from a template instance.
2552
2553 The new instance will not exist in the database or have a valid
2554 id attribute until its save() method is called.
2555 """
2556 assert isinstance(template, cls)
2557 new_row = [getattr(template, field) for field in cls._fields]
2558 clone = cls(row=new_row, new_record=True)
2559 clone.id = None
2560 return clone
2561
2562
showardc85c21b2008-11-24 22:17:37 +00002563 def _view_job_url(self):
2564 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2565
2566
showardf1ae3542009-05-11 19:26:02 +00002567 def get_labels(self):
2568 """
2569 Get all labels associated with this host queue entry (either via the
2570 meta_host or as a job dependency label). The labels yielded are not
2571 guaranteed to be unique.
2572
2573 @yields Label instances associated with this host_queue_entry.
2574 """
2575 if self.meta_host:
2576 yield Label(id=self.meta_host, always_query=False)
2577 labels = Label.fetch(
2578 joins="JOIN jobs_dependency_labels AS deps "
2579 "ON (labels.id = deps.label_id)",
2580 where="deps.job_id = %d" % self.job.id)
2581 for label in labels:
2582 yield label
2583
2584
jadmanski0afbb632008-06-06 21:10:57 +00002585 def set_host(self, host):
2586 if host:
2587 self.queue_log_record('Assigning host ' + host.hostname)
2588 self.update_field('host_id', host.id)
2589 self.update_field('active', True)
2590 self.block_host(host.id)
2591 else:
2592 self.queue_log_record('Releasing host')
2593 self.unblock_host(self.host.id)
2594 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002595
jadmanski0afbb632008-06-06 21:10:57 +00002596 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002597
2598
jadmanski0afbb632008-06-06 21:10:57 +00002599 def get_host(self):
2600 return self.host
mbligh36768f02008-02-22 18:28:33 +00002601
2602
jadmanski0afbb632008-06-06 21:10:57 +00002603 def queue_log_record(self, log_line):
2604 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002605 _drone_manager.write_lines_to_file(self.queue_log_path,
2606 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002607
2608
jadmanski0afbb632008-06-06 21:10:57 +00002609 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002610 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002611 row = [0, self.job.id, host_id]
2612 block = IneligibleHostQueue(row=row, new_record=True)
2613 block.save()
mblighe2586682008-02-29 22:45:46 +00002614
2615
jadmanski0afbb632008-06-06 21:10:57 +00002616 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002617 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002618 blocks = IneligibleHostQueue.fetch(
2619 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2620 for block in blocks:
2621 block.delete()
mblighe2586682008-02-29 22:45:46 +00002622
2623
showard2bab8f42008-11-12 18:15:22 +00002624 def set_execution_subdir(self, subdir=None):
2625 if subdir is None:
2626 assert self.get_host()
2627 subdir = self.get_host().hostname
2628 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002629
2630
showard6355f6b2008-12-05 18:52:13 +00002631 def _get_hostname(self):
2632 if self.host:
2633 return self.host.hostname
2634 return 'no host'
2635
2636
showard170873e2009-01-07 00:22:26 +00002637 def __str__(self):
2638 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2639
2640
jadmanski0afbb632008-06-06 21:10:57 +00002641 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002642 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002643
showardb18134f2009-03-20 20:52:18 +00002644 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002645
showardc85c21b2008-11-24 22:17:37 +00002646 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002647 self.update_field('complete', False)
2648 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002649
jadmanski0afbb632008-06-06 21:10:57 +00002650 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002651 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002652 self.update_field('complete', False)
2653 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002654
showardc85c21b2008-11-24 22:17:37 +00002655 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002656 self.update_field('complete', True)
2657 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002658
2659 should_email_status = (status.lower() in _notify_email_statuses or
2660 'all' in _notify_email_statuses)
2661 if should_email_status:
2662 self._email_on_status(status)
2663
2664 self._email_on_job_complete()
2665
2666
2667 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002668 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002669
2670 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2671 self.job.id, self.job.name, hostname, status)
2672 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2673 self.job.id, self.job.name, hostname, status,
2674 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002675 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002676
2677
2678 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002679 if not self.job.is_finished():
2680 return
showard542e8402008-09-19 20:16:18 +00002681
showardc85c21b2008-11-24 22:17:37 +00002682 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002683 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002684 for queue_entry in hosts_queue:
2685 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002686 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002687 queue_entry.status))
2688
2689 summary_text = "\n".join(summary_text)
2690 status_counts = models.Job.objects.get_status_counts(
2691 [self.job.id])[self.job.id]
2692 status = ', '.join('%d %s' % (count, status) for status, count
2693 in status_counts.iteritems())
2694
2695 subject = 'Autotest: Job ID: %s "%s" %s' % (
2696 self.job.id, self.job.name, status)
2697 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2698 self.job.id, self.job.name, status, self._view_job_url(),
2699 summary_text)
showard170873e2009-01-07 00:22:26 +00002700 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002701
2702
showard77182562009-06-10 00:16:05 +00002703 def run_pre_job_tasks(self, assigned_host=None):
2704 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002705 assert assigned_host
2706 # ensure results dir exists for the queue log
showard08356c12009-06-15 20:24:01 +00002707 if self.host_id is None:
2708 self.set_host(assigned_host)
2709 else:
2710 assert assigned_host.id == self.host_id
mbligh36768f02008-02-22 18:28:33 +00002711
showardcfd4a7e2009-07-11 01:47:33 +00002712 logging.info("%s/%s/%s scheduled on %s, status=%s",
showardb18134f2009-03-20 20:52:18 +00002713 self.job.name, self.meta_host, self.atomic_group_id,
2714 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002715
showard77182562009-06-10 00:16:05 +00002716 return self._do_run_pre_job_tasks()
2717
2718
2719 def _do_run_pre_job_tasks(self):
2720 # Every host goes thru the Verifying stage (which may or may not
2721 # actually do anything as determined by get_pre_job_tasks).
2722 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2723
2724 # The pre job tasks always end with a SetEntryPendingTask which
2725 # will continue as appropriate through queue_entry.on_pending().
2726 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002727
showard6ae5ea92009-02-25 00:11:51 +00002728
jadmanski0afbb632008-06-06 21:10:57 +00002729 def requeue(self):
showardcfd4a7e2009-07-11 01:47:33 +00002730 assert self.host
jadmanski0afbb632008-06-06 21:10:57 +00002731 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002732 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002733 # verify/cleanup failure sets the execution subdir, so reset it here
2734 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002735 if self.meta_host:
2736 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002737
2738
jadmanski0afbb632008-06-06 21:10:57 +00002739 def handle_host_failure(self):
2740 """\
2741 Called when this queue entry's host has failed verification and
2742 repair.
2743 """
2744 assert not self.meta_host
2745 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002746 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002747
2748
jadmanskif7fa2cc2008-10-01 14:13:23 +00002749 @property
2750 def aborted_by(self):
2751 self._load_abort_info()
2752 return self._aborted_by
2753
2754
2755 @property
2756 def aborted_on(self):
2757 self._load_abort_info()
2758 return self._aborted_on
2759
2760
2761 def _load_abort_info(self):
2762 """ Fetch info about who aborted the job. """
2763 if hasattr(self, "_aborted_by"):
2764 return
2765 rows = _db.execute("""
2766 SELECT users.login, aborted_host_queue_entries.aborted_on
2767 FROM aborted_host_queue_entries
2768 INNER JOIN users
2769 ON users.id = aborted_host_queue_entries.aborted_by_id
2770 WHERE aborted_host_queue_entries.queue_entry_id = %s
2771 """, (self.id,))
2772 if rows:
2773 self._aborted_by, self._aborted_on = rows[0]
2774 else:
2775 self._aborted_by = self._aborted_on = None
2776
2777
showardb2e2c322008-10-14 17:33:55 +00002778 def on_pending(self):
2779 """
2780 Called when an entry in a synchronous job has passed verify. If the
2781 job is ready to run, returns an agent to run the job. Returns None
2782 otherwise.
2783 """
2784 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002785 self.get_host().set_status('Pending')
showardb000a8d2009-07-28 20:02:07 +00002786
2787 # Some debug code here: sends an email if an asynchronous job does not
2788 # immediately enter Starting.
2789 # TODO: Remove this once we figure out why asynchronous jobs are getting
2790 # stuck in Pending.
2791 agent = self.job.run_if_ready(queue_entry=self)
2792 if self.job.synch_count == 1 and agent is None:
2793 subject = 'Job %s (id %s)' % (self.job.name, self.job.id)
2794 message = 'Asynchronous job stuck in Pending'
2795 email_manager.manager.enqueue_notify_email(subject, message)
2796 return agent
showardb2e2c322008-10-14 17:33:55 +00002797
2798
showardd3dc1992009-04-22 21:01:40 +00002799 def abort(self, dispatcher):
2800 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002801
showardd3dc1992009-04-22 21:01:40 +00002802 Status = models.HostQueueEntry.Status
2803 has_running_job_agent = (
2804 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2805 and dispatcher.get_agents_for_entry(self))
2806 if has_running_job_agent:
2807 # do nothing; post-job tasks will finish and then mark this entry
2808 # with status "Aborted" and take care of the host
2809 return
2810
2811 if self.status in (Status.STARTING, Status.PENDING):
2812 self.host.set_status(models.Host.Status.READY)
2813 elif self.status == Status.VERIFYING:
2814 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2815
2816 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002817
2818 def execution_tag(self):
2819 assert self.execution_subdir
mblighe7d9c602009-07-02 19:02:33 +00002820 return "%s/%s" % (self.job.tag(), self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002821
2822
showarded2afea2009-07-07 20:54:07 +00002823 def execution_path(self):
2824 return self.execution_tag()
2825
2826
mbligh36768f02008-02-22 18:28:33 +00002827class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002828 _table_name = 'jobs'
2829 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2830 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002831 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002832 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002833
showard77182562009-06-10 00:16:05 +00002834 # This does not need to be a column in the DB. The delays are likely to
2835 # be configured short. If the scheduler is stopped and restarted in
2836 # the middle of a job's delay cycle, the delay cycle will either be
2837 # repeated or skipped depending on the number of Pending machines found
2838 # when the restarted scheduler recovers to track it. Not a problem.
2839 #
2840 # A reference to the DelayedCallTask that will wake up the job should
2841 # no other HQEs change state in time. Its end_time attribute is used
2842 # by our run_with_ready_delay() method to determine if the wait is over.
2843 _delay_ready_task = None
2844
2845 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2846 # all status='Pending' atomic group HQEs incase a delay was running when the
2847 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002848
showarda3c58572009-03-12 20:36:59 +00002849 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002850 assert id or row
showarda3c58572009-03-12 20:36:59 +00002851 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002852
mblighe2586682008-02-29 22:45:46 +00002853
jadmanski0afbb632008-06-06 21:10:57 +00002854 def is_server_job(self):
2855 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002856
2857
showard170873e2009-01-07 00:22:26 +00002858 def tag(self):
2859 return "%s-%s" % (self.id, self.owner)
2860
2861
jadmanski0afbb632008-06-06 21:10:57 +00002862 def get_host_queue_entries(self):
2863 rows = _db.execute("""
2864 SELECT * FROM host_queue_entries
2865 WHERE job_id= %s
2866 """, (self.id,))
2867 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002868
jadmanski0afbb632008-06-06 21:10:57 +00002869 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002870
jadmanski0afbb632008-06-06 21:10:57 +00002871 return entries
mbligh36768f02008-02-22 18:28:33 +00002872
2873
jadmanski0afbb632008-06-06 21:10:57 +00002874 def set_status(self, status, update_queues=False):
2875 self.update_field('status',status)
2876
2877 if update_queues:
2878 for queue_entry in self.get_host_queue_entries():
2879 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002880
2881
showard77182562009-06-10 00:16:05 +00002882 def _atomic_and_has_started(self):
2883 """
2884 @returns True if any of the HostQueueEntries associated with this job
2885 have entered the Status.STARTING state or beyond.
2886 """
2887 atomic_entries = models.HostQueueEntry.objects.filter(
2888 job=self.id, atomic_group__isnull=False)
2889 if atomic_entries.count() <= 0:
2890 return False
2891
showardaf8b4ca2009-06-16 18:47:26 +00002892 # These states may *only* be reached if Job.run() has been called.
2893 started_statuses = (models.HostQueueEntry.Status.STARTING,
2894 models.HostQueueEntry.Status.RUNNING,
2895 models.HostQueueEntry.Status.COMPLETED)
2896
2897 started_entries = atomic_entries.filter(status__in=started_statuses)
showard77182562009-06-10 00:16:05 +00002898 return started_entries.count() > 0
2899
2900
2901 def _pending_count(self):
2902 """The number of HostQueueEntries for this job in the Pending state."""
2903 pending_entries = models.HostQueueEntry.objects.filter(
2904 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2905 return pending_entries.count()
2906
2907
jadmanski0afbb632008-06-06 21:10:57 +00002908 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002909 # NOTE: Atomic group jobs stop reporting ready after they have been
2910 # started to avoid launching multiple copies of one atomic job.
2911 # Only possible if synch_count is less than than half the number of
2912 # machines in the atomic group.
showardb000a8d2009-07-28 20:02:07 +00002913 pending_count = self._pending_count()
2914 atomic_and_has_started = self._atomic_and_has_started()
2915 ready = (pending_count >= self.synch_count
2916 and not self._atomic_and_has_started())
2917
2918 if not ready:
2919 logging.info(
2920 'Job %s not ready: %s pending, %s required '
2921 '(Atomic and started: %s)',
2922 self, pending_count, self.synch_count,
2923 atomic_and_has_started)
2924
2925 return ready
mbligh36768f02008-02-22 18:28:33 +00002926
2927
jadmanski0afbb632008-06-06 21:10:57 +00002928 def num_machines(self, clause = None):
2929 sql = "job_id=%s" % self.id
2930 if clause:
2931 sql += " AND (%s)" % clause
2932 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002933
2934
jadmanski0afbb632008-06-06 21:10:57 +00002935 def num_queued(self):
2936 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002937
2938
jadmanski0afbb632008-06-06 21:10:57 +00002939 def num_active(self):
2940 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002941
2942
jadmanski0afbb632008-06-06 21:10:57 +00002943 def num_complete(self):
2944 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002945
2946
jadmanski0afbb632008-06-06 21:10:57 +00002947 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002948 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002949
mbligh36768f02008-02-22 18:28:33 +00002950
showard6bb7c292009-01-30 01:44:51 +00002951 def _not_yet_run_entries(self, include_verifying=True):
2952 statuses = [models.HostQueueEntry.Status.QUEUED,
2953 models.HostQueueEntry.Status.PENDING]
2954 if include_verifying:
2955 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2956 return models.HostQueueEntry.objects.filter(job=self.id,
2957 status__in=statuses)
2958
2959
2960 def _stop_all_entries(self):
2961 entries_to_stop = self._not_yet_run_entries(
2962 include_verifying=False)
2963 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002964 assert not child_entry.complete, (
2965 '%s status=%s, active=%s, complete=%s' %
2966 (child_entry.id, child_entry.status, child_entry.active,
2967 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002968 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2969 child_entry.host.status = models.Host.Status.READY
2970 child_entry.host.save()
2971 child_entry.status = models.HostQueueEntry.Status.STOPPED
2972 child_entry.save()
2973
showard2bab8f42008-11-12 18:15:22 +00002974 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002975 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002976 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002977 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002978
2979
jadmanski0afbb632008-06-06 21:10:57 +00002980 def write_to_machines_file(self, queue_entry):
2981 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002982 file_path = os.path.join(self.tag(), '.machines')
2983 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002984
2985
showardf1ae3542009-05-11 19:26:02 +00002986 def _next_group_name(self, group_name=''):
2987 """@returns a directory name to use for the next host group results."""
2988 if group_name:
2989 # Sanitize for use as a pathname.
2990 group_name = group_name.replace(os.path.sep, '_')
2991 if group_name.startswith('.'):
2992 group_name = '_' + group_name[1:]
2993 # Add a separator between the group name and 'group%d'.
2994 group_name += '.'
2995 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002996 query = models.HostQueueEntry.objects.filter(
2997 job=self.id).values('execution_subdir').distinct()
2998 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002999 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
3000 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00003001 if ids:
3002 next_id = max(ids) + 1
3003 else:
3004 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00003005 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00003006
3007
showard170873e2009-01-07 00:22:26 +00003008 def _write_control_file(self, execution_tag):
3009 control_path = _drone_manager.attach_file_to_execution(
3010 execution_tag, self.control_file)
3011 return control_path
mbligh36768f02008-02-22 18:28:33 +00003012
showardb2e2c322008-10-14 17:33:55 +00003013
showard2bab8f42008-11-12 18:15:22 +00003014 def get_group_entries(self, queue_entry_from_group):
3015 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00003016 return list(HostQueueEntry.fetch(
3017 where='job_id=%s AND execution_subdir=%s',
3018 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00003019
3020
showardb2e2c322008-10-14 17:33:55 +00003021 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00003022 assert queue_entries
3023 execution_tag = queue_entries[0].execution_tag()
3024 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00003025 hostnames = ','.join([entry.get_host().hostname
3026 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00003027
showard87ba02a2009-04-20 19:37:32 +00003028 params = _autoserv_command_line(
showarded2afea2009-07-07 20:54:07 +00003029 hostnames,
showard87ba02a2009-04-20 19:37:32 +00003030 ['-P', execution_tag, '-n',
3031 _drone_manager.absolute_path(control_path)],
showarde9c69362009-06-30 01:58:03 +00003032 job=self, verbose=False)
mbligh36768f02008-02-22 18:28:33 +00003033
jadmanski0afbb632008-06-06 21:10:57 +00003034 if not self.is_server_job():
3035 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00003036
showardb2e2c322008-10-14 17:33:55 +00003037 return params
mblighe2586682008-02-29 22:45:46 +00003038
mbligh36768f02008-02-22 18:28:33 +00003039
showardc9ae1782009-01-30 01:42:37 +00003040 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00003041 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00003042 return True
showard0fc38302008-10-23 00:44:07 +00003043 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00003044 return queue_entry.get_host().dirty
3045 return False
showard21baa452008-10-21 00:08:39 +00003046
showardc9ae1782009-01-30 01:42:37 +00003047
showard2fe3f1d2009-07-06 20:19:11 +00003048 def should_run_verify(self, queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003049 do_not_verify = (queue_entry.host.protection ==
3050 host_protections.Protection.DO_NOT_VERIFY)
3051 if do_not_verify:
3052 return False
3053 return self.run_verify
3054
3055
showard77182562009-06-10 00:16:05 +00003056 def get_pre_job_tasks(self, queue_entry):
3057 """
3058 Get a list of tasks to perform before the host_queue_entry
3059 may be used to run this Job (such as Cleanup & Verify).
3060
3061 @returns A list of tasks to be done to the given queue_entry before
3062 it should be considered be ready to run this job. The last
3063 task in the list calls HostQueueEntry.on_pending(), which
3064 continues the flow of the job.
3065 """
showard21baa452008-10-21 00:08:39 +00003066 tasks = []
showardc9ae1782009-01-30 01:42:37 +00003067 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00003068 tasks.append(CleanupTask(queue_entry=queue_entry))
showard2fe3f1d2009-07-06 20:19:11 +00003069 if self.should_run_verify(queue_entry):
showardc9ae1782009-01-30 01:42:37 +00003070 tasks.append(VerifyTask(queue_entry=queue_entry))
3071 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00003072 return tasks
3073
3074
showardf1ae3542009-05-11 19:26:02 +00003075 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00003076 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00003077 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00003078 else:
showardf1ae3542009-05-11 19:26:02 +00003079 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00003080 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00003081 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00003082 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003083
3084 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00003085 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00003086
3087
3088 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00003089 """
3090 @returns A tuple containing a list of HostQueueEntry instances to be
3091 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00003092 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00003093 """
showard77182562009-06-10 00:16:05 +00003094 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00003095 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00003096 if atomic_group:
3097 num_entries_wanted = atomic_group.max_number_of_machines
3098 else:
3099 num_entries_wanted = self.synch_count
3100 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00003101
showardf1ae3542009-05-11 19:26:02 +00003102 if num_entries_wanted > 0:
3103 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00003104 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00003105 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00003106 params=(self.id, include_queue_entry.id)))
3107
3108 # Sort the chosen hosts by hostname before slicing.
3109 def cmp_queue_entries_by_hostname(entry_a, entry_b):
3110 return Host.cmp_for_sort(entry_a.host, entry_b.host)
3111 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
3112 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00003113
showardf1ae3542009-05-11 19:26:02 +00003114 # Sanity check. We'll only ever be called if this can be met.
3115 assert len(chosen_entries) >= self.synch_count
3116
3117 if atomic_group:
3118 # Look at any meta_host and dependency labels and pick the first
3119 # one that also specifies this atomic group. Use that label name
3120 # as the group name if possible (it is more specific).
3121 group_name = atomic_group.name
3122 for label in include_queue_entry.get_labels():
3123 if label.atomic_group_id:
3124 assert label.atomic_group_id == atomic_group.id
3125 group_name = label.name
3126 break
3127 else:
3128 group_name = ''
3129
3130 self._assign_new_group(chosen_entries, group_name=group_name)
3131 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00003132
3133
showard77182562009-06-10 00:16:05 +00003134 def run_if_ready(self, queue_entry):
3135 """
3136 @returns An Agent instance to ultimately run this job if enough hosts
3137 are ready for it to run.
3138 @returns None and potentially cleans up excess hosts if this Job
3139 is not ready to run.
3140 """
showardb2e2c322008-10-14 17:33:55 +00003141 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003142 self.stop_if_necessary()
3143 return None
mbligh36768f02008-02-22 18:28:33 +00003144
showard77182562009-06-10 00:16:05 +00003145 if queue_entry.atomic_group:
3146 return self.run_with_ready_delay(queue_entry)
3147
3148 return self.run(queue_entry)
3149
3150
3151 def run_with_ready_delay(self, queue_entry):
3152 """
3153 Start a delay to wait for more hosts to enter Pending state before
3154 launching an atomic group job. Once set, the a delay cannot be reset.
3155
3156 @param queue_entry: The HostQueueEntry object to get atomic group
3157 info from and pass to run_if_ready when the delay is up.
3158
3159 @returns An Agent to run the job as appropriate or None if a delay
3160 has already been set.
3161 """
3162 assert queue_entry.job_id == self.id
3163 assert queue_entry.atomic_group
3164 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3165 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3166 over_max_threshold = (self._pending_count() >= pending_threshold)
3167 delay_expired = (self._delay_ready_task and
3168 time.time() >= self._delay_ready_task.end_time)
3169
3170 # Delay is disabled or we already have enough? Do not wait to run.
3171 if not delay or over_max_threshold or delay_expired:
3172 return self.run(queue_entry)
3173
3174 # A delay was previously scheduled.
3175 if self._delay_ready_task:
3176 return None
3177
3178 def run_job_after_delay():
3179 logging.info('Job %s done waiting for extra hosts.', self.id)
3180 return self.run(queue_entry)
3181
3182 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3183 callback=run_job_after_delay)
3184
3185 return Agent([self._delay_ready_task], num_processes=0)
3186
3187
3188 def run(self, queue_entry):
3189 """
3190 @param queue_entry: The HostQueueEntry instance calling this method.
3191 @returns An Agent instance to run this job or None if we've already
3192 been run.
3193 """
3194 if queue_entry.atomic_group and self._atomic_and_has_started():
3195 logging.error('Job.run() called on running atomic Job %d '
3196 'with HQE %s.', self.id, queue_entry)
3197 return None
showardf1ae3542009-05-11 19:26:02 +00003198 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3199 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003200
3201
showardf1ae3542009-05-11 19:26:02 +00003202 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003203 for queue_entry in queue_entries:
3204 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003205 params = self._get_autoserv_params(queue_entries)
3206 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003207 cmd=params, group_name=group_name)
3208 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003209 if self._delay_ready_task:
3210 # Cancel any pending callback that would try to run again
3211 # as we are already running.
3212 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003213
showard170873e2009-01-07 00:22:26 +00003214 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003215
3216
showardb000a8d2009-07-28 20:02:07 +00003217 def __str__(self):
3218 return '%s-%s' % (self.id, self.owner)
3219
3220
mbligh36768f02008-02-22 18:28:33 +00003221if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003222 main()