blob: ef2417353b140bbc31ce29abacbfa04dda5c1fb9 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showardef519212009-05-08 02:29:53 +00008import datetime, errno, optparse, os, pwd, Queue, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard844960a2009-05-29 18:41:18 +000017from autotest_lib.frontend.afe import models, rpc_utils, readonly_connection
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showardd3dc1992009-04-22 21:01:40 +000039_AUTOSERV_PID_FILE = '.autoserv_execute'
40_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
41_PARSER_PID_FILE = '.parser_execute'
42
43_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
44 _PARSER_PID_FILE)
45
showard35162b02009-03-03 02:17:30 +000046# error message to leave in results dir when an autoserv process disappears
47# mysteriously
48_LOST_PROCESS_ERROR = """\
49Autoserv failed abnormally during execution for this job, probably due to a
50system error on the Autotest server. Full results may not be available. Sorry.
51"""
52
mbligh6f8bab42008-02-29 22:45:14 +000053_db = None
mbligh36768f02008-02-22 18:28:33 +000054_shutdown = False
showard170873e2009-01-07 00:22:26 +000055_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
56_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000057_testing_mode = False
showard542e8402008-09-19 20:16:18 +000058_base_url = None
showardc85c21b2008-11-24 22:17:37 +000059_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000060_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000061
showardb18134f2009-03-20 20:52:18 +000062# load the logging settings
63scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000064if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
65 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000066# Here we export the log name, using the same convention as autoserv's results
67# directory.
mblighc9895aa2009-04-01 18:36:58 +000068if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
69 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
70else:
71 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
72 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
73
showardb18134f2009-03-20 20:52:18 +000074logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
75
mbligh36768f02008-02-22 18:28:33 +000076
mbligh83c1e9e2009-05-01 23:10:41 +000077def _site_init_monitor_db_dummy():
78 return {}
79
80
mbligh36768f02008-02-22 18:28:33 +000081def main():
showard27f33872009-04-07 18:20:53 +000082 try:
83 main_without_exception_handling()
showard29caa4b2009-05-26 19:27:09 +000084 except SystemExit:
85 raise
showard27f33872009-04-07 18:20:53 +000086 except:
87 logging.exception('Exception escaping in monitor_db')
88 raise
89
90
91def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000092 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000093
jadmanski0afbb632008-06-06 21:10:57 +000094 parser = optparse.OptionParser(usage)
95 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
96 action='store_true')
97 parser.add_option('--logfile', help='Set a log file that all stdout ' +
98 'should be redirected to. Stderr will go to this ' +
99 'file + ".err"')
100 parser.add_option('--test', help='Indicate that scheduler is under ' +
101 'test and should use dummy autoserv and no parsing',
102 action='store_true')
103 (options, args) = parser.parse_args()
104 if len(args) != 1:
105 parser.print_usage()
106 return
mbligh36768f02008-02-22 18:28:33 +0000107
showard5613c662009-06-08 23:30:33 +0000108 scheduler_enabled = global_config.global_config.get_config_value(
109 scheduler_config.CONFIG_SECTION, 'enable_scheduler', type=bool)
110
111 if not scheduler_enabled:
112 msg = ("Scheduler not enabled, set enable_scheduler to true in the "
113 "global_config's SCHEDULER section to enabled it. Exiting.")
114 print msg
115 sys.exit(1)
116
jadmanski0afbb632008-06-06 21:10:57 +0000117 global RESULTS_DIR
118 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000119
mbligh83c1e9e2009-05-01 23:10:41 +0000120 site_init = utils.import_site_function(__file__,
121 "autotest_lib.scheduler.site_monitor_db", "site_init_monitor_db",
122 _site_init_monitor_db_dummy)
123 site_init()
124
showardcca334f2009-03-12 20:38:34 +0000125 # Change the cwd while running to avoid issues incase we were launched from
126 # somewhere odd (such as a random NFS home directory of the person running
127 # sudo to launch us as the appropriate user).
128 os.chdir(RESULTS_DIR)
129
jadmanski0afbb632008-06-06 21:10:57 +0000130 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000131 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
132 "notify_email_statuses",
133 default='')
showardc85c21b2008-11-24 22:17:37 +0000134 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000135 _notify_email_statuses = [status for status in
136 re.split(r'[\s,;:]', notify_statuses_list.lower())
137 if status]
showardc85c21b2008-11-24 22:17:37 +0000138
jadmanski0afbb632008-06-06 21:10:57 +0000139 if options.test:
140 global _autoserv_path
141 _autoserv_path = 'autoserv_dummy'
142 global _testing_mode
143 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000144
mbligh37eceaa2008-12-15 22:56:37 +0000145 # AUTOTEST_WEB.base_url is still a supported config option as some people
146 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000147 global _base_url
showard170873e2009-01-07 00:22:26 +0000148 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
149 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000150 if config_base_url:
151 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000152 else:
mbligh37eceaa2008-12-15 22:56:37 +0000153 # For the common case of everything running on a single server you
154 # can just set the hostname in a single place in the config file.
155 server_name = c.get_config_value('SERVER', 'hostname')
156 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000157 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000158 sys.exit(1)
159 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000160
showardc5afc462009-01-13 00:09:39 +0000161 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000162 server.start()
163
jadmanski0afbb632008-06-06 21:10:57 +0000164 try:
showardc5afc462009-01-13 00:09:39 +0000165 init(options.logfile)
166 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000167 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000168
jadmanski0afbb632008-06-06 21:10:57 +0000169 while not _shutdown:
170 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000171 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000172 except:
showard170873e2009-01-07 00:22:26 +0000173 email_manager.manager.log_stacktrace(
174 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000175
showard170873e2009-01-07 00:22:26 +0000176 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000177 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000178 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000179 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000180
181
182def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000183 global _shutdown
184 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000185 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000186
187
188def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000189 if logfile:
190 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000191 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
192 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000193
mblighfb676032009-04-01 18:25:38 +0000194 utils.write_pid("monitor_db")
195
showardb1e51872008-10-07 11:08:18 +0000196 if _testing_mode:
197 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000198 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000199
jadmanski0afbb632008-06-06 21:10:57 +0000200 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
201 global _db
showard170873e2009-01-07 00:22:26 +0000202 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000203 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000204
showardfa8629c2008-11-04 16:51:23 +0000205 # ensure Django connection is in autocommit
206 setup_django_environment.enable_autocommit()
showard844960a2009-05-29 18:41:18 +0000207 # bypass the readonly connection
208 readonly_connection.ReadOnlyConnection.set_globally_disabled(True)
showardfa8629c2008-11-04 16:51:23 +0000209
showardb18134f2009-03-20 20:52:18 +0000210 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000211 signal.signal(signal.SIGINT, handle_sigint)
212
showardd1ee1dd2009-01-07 21:33:08 +0000213 drones = global_config.global_config.get_config_value(
214 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
215 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000216 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000217 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000218 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
219
showardb18134f2009-03-20 20:52:18 +0000220 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000221
222
223def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000224 out_file = logfile
225 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000226 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000227 out_fd = open(out_file, "a", buffering=0)
228 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000229
jadmanski0afbb632008-06-06 21:10:57 +0000230 os.dup2(out_fd.fileno(), sys.stdout.fileno())
231 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000232
jadmanski0afbb632008-06-06 21:10:57 +0000233 sys.stdout = out_fd
234 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000235
236
showard87ba02a2009-04-20 19:37:32 +0000237def _autoserv_command_line(machines, results_dir, extra_args, job=None,
238 queue_entry=None):
showardf1ae3542009-05-11 19:26:02 +0000239 """
240 @returns The autoserv command line as a list of executable + parameters.
241
242 @param machines - string - A machine or comma separated list of machines
243 for the (-m) flag.
244 @param results_dir - string - Where the results will be written (-r).
245 @param extra_args - list - Additional arguments to pass to autoserv.
246 @param job - Job object - If supplied, -u owner and -l name parameters
247 will be added.
248 @param queue_entry - A HostQueueEntry object - If supplied and no Job
249 object was supplied, this will be used to lookup the Job object.
250 """
showard87ba02a2009-04-20 19:37:32 +0000251 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
252 '-r', _drone_manager.absolute_path(results_dir)]
253 if job or queue_entry:
254 if not job:
255 job = queue_entry.job
256 autoserv_argv += ['-u', job.owner, '-l', job.name]
257 return autoserv_argv + extra_args
258
259
showard89f84db2009-03-12 20:39:13 +0000260class SchedulerError(Exception):
261 """Raised by HostScheduler when an inconsistent state occurs."""
262
263
showard63a34772008-08-18 19:32:50 +0000264class HostScheduler(object):
265 def _get_ready_hosts(self):
266 # avoid any host with a currently active queue entry against it
267 hosts = Host.fetch(
268 joins='LEFT JOIN host_queue_entries AS active_hqe '
269 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000270 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000271 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000272 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000273 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
274 return dict((host.id, host) for host in hosts)
275
276
277 @staticmethod
278 def _get_sql_id_list(id_list):
279 return ','.join(str(item_id) for item_id in id_list)
280
281
282 @classmethod
showard989f25d2008-10-01 11:38:11 +0000283 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000284 if not id_list:
285 return {}
showard63a34772008-08-18 19:32:50 +0000286 query %= cls._get_sql_id_list(id_list)
287 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000288 return cls._process_many2many_dict(rows, flip)
289
290
291 @staticmethod
292 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000293 result = {}
294 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000295 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000296 if flip:
297 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000298 result.setdefault(left_id, set()).add(right_id)
299 return result
300
301
302 @classmethod
303 def _get_job_acl_groups(cls, job_ids):
304 query = """
showardd9ac4452009-02-07 02:04:37 +0000305 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000306 FROM jobs
307 INNER JOIN users ON users.login = jobs.owner
308 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
309 WHERE jobs.id IN (%s)
310 """
311 return cls._get_many2many_dict(query, job_ids)
312
313
314 @classmethod
315 def _get_job_ineligible_hosts(cls, job_ids):
316 query = """
317 SELECT job_id, host_id
318 FROM ineligible_host_queues
319 WHERE job_id IN (%s)
320 """
321 return cls._get_many2many_dict(query, job_ids)
322
323
324 @classmethod
showard989f25d2008-10-01 11:38:11 +0000325 def _get_job_dependencies(cls, job_ids):
326 query = """
327 SELECT job_id, label_id
328 FROM jobs_dependency_labels
329 WHERE job_id IN (%s)
330 """
331 return cls._get_many2many_dict(query, job_ids)
332
333
334 @classmethod
showard63a34772008-08-18 19:32:50 +0000335 def _get_host_acls(cls, host_ids):
336 query = """
showardd9ac4452009-02-07 02:04:37 +0000337 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000338 FROM acl_groups_hosts
339 WHERE host_id IN (%s)
340 """
341 return cls._get_many2many_dict(query, host_ids)
342
343
344 @classmethod
345 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000346 if not host_ids:
347 return {}, {}
showard63a34772008-08-18 19:32:50 +0000348 query = """
349 SELECT label_id, host_id
350 FROM hosts_labels
351 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000352 """ % cls._get_sql_id_list(host_ids)
353 rows = _db.execute(query)
354 labels_to_hosts = cls._process_many2many_dict(rows)
355 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
356 return labels_to_hosts, hosts_to_labels
357
358
359 @classmethod
360 def _get_labels(cls):
361 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000362
363
364 def refresh(self, pending_queue_entries):
365 self._hosts_available = self._get_ready_hosts()
366
367 relevant_jobs = [queue_entry.job_id
368 for queue_entry in pending_queue_entries]
369 self._job_acls = self._get_job_acl_groups(relevant_jobs)
370 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000371 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000372
373 host_ids = self._hosts_available.keys()
374 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000375 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
376
377 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000378
379
380 def _is_acl_accessible(self, host_id, queue_entry):
381 job_acls = self._job_acls.get(queue_entry.job_id, set())
382 host_acls = self._host_acls.get(host_id, set())
383 return len(host_acls.intersection(job_acls)) > 0
384
385
showard989f25d2008-10-01 11:38:11 +0000386 def _check_job_dependencies(self, job_dependencies, host_labels):
387 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000388 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000389
390
391 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
392 queue_entry):
showardade14e22009-01-26 22:38:32 +0000393 if not queue_entry.meta_host:
394 # bypass only_if_needed labels when a specific host is selected
395 return True
396
showard989f25d2008-10-01 11:38:11 +0000397 for label_id in host_labels:
398 label = self._labels[label_id]
399 if not label.only_if_needed:
400 # we don't care about non-only_if_needed labels
401 continue
402 if queue_entry.meta_host == label_id:
403 # if the label was requested in a metahost it's OK
404 continue
405 if label_id not in job_dependencies:
406 return False
407 return True
408
409
showard89f84db2009-03-12 20:39:13 +0000410 def _check_atomic_group_labels(self, host_labels, queue_entry):
411 """
412 Determine if the given HostQueueEntry's atomic group settings are okay
413 to schedule on a host with the given labels.
414
415 @param host_labels - A list of label ids that the host has.
416 @param queue_entry - The HostQueueEntry being considered for the host.
417
418 @returns True if atomic group settings are okay, False otherwise.
419 """
420 return (self._get_host_atomic_group_id(host_labels) ==
421 queue_entry.atomic_group_id)
422
423
424 def _get_host_atomic_group_id(self, host_labels):
425 """
426 Return the atomic group label id for a host with the given set of
427 labels if any, or None otherwise. Raises an exception if more than
428 one atomic group are found in the set of labels.
429
430 @param host_labels - A list of label ids that the host has.
431
432 @returns The id of the atomic group found on a label in host_labels
433 or None if no atomic group label is found.
434 @raises SchedulerError - If more than one atomic group label is found.
435 """
436 atomic_ids = [self._labels[label_id].atomic_group_id
437 for label_id in host_labels
438 if self._labels[label_id].atomic_group_id is not None]
439 if not atomic_ids:
440 return None
441 if len(atomic_ids) > 1:
442 raise SchedulerError('More than one atomic label on host.')
443 return atomic_ids[0]
444
445
446 def _get_atomic_group_labels(self, atomic_group_id):
447 """
448 Lookup the label ids that an atomic_group is associated with.
449
450 @param atomic_group_id - The id of the AtomicGroup to look up.
451
452 @returns A generator yeilding Label ids for this atomic group.
453 """
454 return (id for id, label in self._labels.iteritems()
455 if label.atomic_group_id == atomic_group_id
456 and not label.invalid)
457
458
showard54c1ea92009-05-20 00:32:58 +0000459 def _get_eligible_host_ids_in_group(self, group_hosts, queue_entry):
showard89f84db2009-03-12 20:39:13 +0000460 """
461 @param group_hosts - A sequence of Host ids to test for usability
462 and eligibility against the Job associated with queue_entry.
463 @param queue_entry - The HostQueueEntry that these hosts are being
464 tested for eligibility against.
465
466 @returns A subset of group_hosts Host ids that are eligible for the
467 supplied queue_entry.
468 """
469 return set(host_id for host_id in group_hosts
470 if self._is_host_usable(host_id)
471 and self._is_host_eligible_for_job(host_id, queue_entry))
472
473
showard989f25d2008-10-01 11:38:11 +0000474 def _is_host_eligible_for_job(self, host_id, queue_entry):
475 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
476 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000477
showard89f84db2009-03-12 20:39:13 +0000478 return (self._is_acl_accessible(host_id, queue_entry) and
479 self._check_job_dependencies(job_dependencies, host_labels) and
480 self._check_only_if_needed_labels(
481 job_dependencies, host_labels, queue_entry) and
482 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000483
484
showard63a34772008-08-18 19:32:50 +0000485 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000486 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000487 return None
488 return self._hosts_available.pop(queue_entry.host_id, None)
489
490
491 def _is_host_usable(self, host_id):
492 if host_id not in self._hosts_available:
493 # host was already used during this scheduling cycle
494 return False
495 if self._hosts_available[host_id].invalid:
496 # Invalid hosts cannot be used for metahosts. They're included in
497 # the original query because they can be used by non-metahosts.
498 return False
499 return True
500
501
502 def _schedule_metahost(self, queue_entry):
503 label_id = queue_entry.meta_host
504 hosts_in_label = self._label_hosts.get(label_id, set())
505 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
506 set())
507
508 # must iterate over a copy so we can mutate the original while iterating
509 for host_id in list(hosts_in_label):
510 if not self._is_host_usable(host_id):
511 hosts_in_label.remove(host_id)
512 continue
513 if host_id in ineligible_host_ids:
514 continue
showard989f25d2008-10-01 11:38:11 +0000515 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000516 continue
517
showard89f84db2009-03-12 20:39:13 +0000518 # Remove the host from our cached internal state before returning
519 # the host object.
showard63a34772008-08-18 19:32:50 +0000520 hosts_in_label.remove(host_id)
521 return self._hosts_available.pop(host_id)
522 return None
523
524
525 def find_eligible_host(self, queue_entry):
526 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000527 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000528 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000529 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000530 return self._schedule_metahost(queue_entry)
531
532
showard89f84db2009-03-12 20:39:13 +0000533 def find_eligible_atomic_group(self, queue_entry):
534 """
535 Given an atomic group host queue entry, locate an appropriate group
536 of hosts for the associated job to run on.
537
538 The caller is responsible for creating new HQEs for the additional
539 hosts returned in order to run the actual job on them.
540
541 @returns A list of Host instances in a ready state to satisfy this
542 atomic group scheduling. Hosts will all belong to the same
543 atomic group label as specified by the queue_entry.
544 An empty list will be returned if no suitable atomic
545 group could be found.
546
547 TODO(gps): what is responsible for kicking off any attempted repairs on
548 a group of hosts? not this function, but something needs to. We do
549 not communicate that reason for returning [] outside of here...
550 For now, we'll just be unschedulable if enough hosts within one group
551 enter Repair Failed state.
552 """
553 assert queue_entry.atomic_group_id is not None
554 job = queue_entry.job
555 assert job.synch_count and job.synch_count > 0
showard77182562009-06-10 00:16:05 +0000556 atomic_group = queue_entry.atomic_group
showard89f84db2009-03-12 20:39:13 +0000557 if job.synch_count > atomic_group.max_number_of_machines:
558 # Such a Job and HostQueueEntry should never be possible to
559 # create using the frontend. Regardless, we can't process it.
560 # Abort it immediately and log an error on the scheduler.
561 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000562 logging.error(
563 'Error: job %d synch_count=%d > requested atomic_group %d '
564 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
565 job.id, job.synch_count, atomic_group.id,
566 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000567 return []
568 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
569 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
570 set())
571
572 # Look in each label associated with atomic_group until we find one with
573 # enough hosts to satisfy the job.
574 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
575 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
576 if queue_entry.meta_host is not None:
577 # If we have a metahost label, only allow its hosts.
578 group_hosts.intersection_update(hosts_in_label)
579 group_hosts -= ineligible_host_ids
showard54c1ea92009-05-20 00:32:58 +0000580 eligible_host_ids_in_group = self._get_eligible_host_ids_in_group(
showard89f84db2009-03-12 20:39:13 +0000581 group_hosts, queue_entry)
582
583 # Job.synch_count is treated as "minimum synch count" when
584 # scheduling for an atomic group of hosts. The atomic group
585 # number of machines is the maximum to pick out of a single
586 # atomic group label for scheduling at one time.
587 min_hosts = job.synch_count
588 max_hosts = atomic_group.max_number_of_machines
589
showard54c1ea92009-05-20 00:32:58 +0000590 if len(eligible_host_ids_in_group) < min_hosts:
showard89f84db2009-03-12 20:39:13 +0000591 # Not enough eligible hosts in this atomic group label.
592 continue
593
showard54c1ea92009-05-20 00:32:58 +0000594 eligible_hosts_in_group = [self._hosts_available[id]
595 for id in eligible_host_ids_in_group]
showardef519212009-05-08 02:29:53 +0000596 # So that they show up in a sane order when viewing the job.
showard54c1ea92009-05-20 00:32:58 +0000597 eligible_hosts_in_group.sort(cmp=Host.cmp_for_sort)
showardef519212009-05-08 02:29:53 +0000598
showard89f84db2009-03-12 20:39:13 +0000599 # Limit ourselves to scheduling the atomic group size.
600 if len(eligible_hosts_in_group) > max_hosts:
showardef519212009-05-08 02:29:53 +0000601 eligible_hosts_in_group = eligible_hosts_in_group[:max_hosts]
showard89f84db2009-03-12 20:39:13 +0000602
603 # Remove the selected hosts from our cached internal state
604 # of available hosts in order to return the Host objects.
605 host_list = []
showard54c1ea92009-05-20 00:32:58 +0000606 for host in eligible_hosts_in_group:
607 hosts_in_label.discard(host.id)
608 self._hosts_available.pop(host.id)
609 host_list.append(host)
showard89f84db2009-03-12 20:39:13 +0000610 return host_list
611
612 return []
613
614
showard170873e2009-01-07 00:22:26 +0000615class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000616 def __init__(self):
617 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000618 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000619 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000620 user_cleanup_time = scheduler_config.config.clean_interval
621 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
622 _db, user_cleanup_time)
623 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000624 self._host_agents = {}
625 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000626
mbligh36768f02008-02-22 18:28:33 +0000627
showard915958d2009-04-22 21:00:58 +0000628 def initialize(self, recover_hosts=True):
629 self._periodic_cleanup.initialize()
630 self._24hr_upkeep.initialize()
631
jadmanski0afbb632008-06-06 21:10:57 +0000632 # always recover processes
633 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000634
jadmanski0afbb632008-06-06 21:10:57 +0000635 if recover_hosts:
636 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000637
638
jadmanski0afbb632008-06-06 21:10:57 +0000639 def tick(self):
showard170873e2009-01-07 00:22:26 +0000640 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000641 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000642 self._find_aborting()
showard1ff7b2e2009-05-15 23:17:18 +0000643 self._find_reverify()
showard29f7cd22009-04-29 21:16:24 +0000644 self._process_recurring_runs()
jadmanski0afbb632008-06-06 21:10:57 +0000645 self._schedule_new_jobs()
646 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000647 _drone_manager.execute_actions()
648 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000649
showard97aed502008-11-04 02:01:24 +0000650
mblighf3294cc2009-04-08 21:17:38 +0000651 def _run_cleanup(self):
652 self._periodic_cleanup.run_cleanup_maybe()
653 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000654
mbligh36768f02008-02-22 18:28:33 +0000655
showard170873e2009-01-07 00:22:26 +0000656 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
657 for object_id in object_ids:
658 agent_dict.setdefault(object_id, set()).add(agent)
659
660
661 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
662 for object_id in object_ids:
663 assert object_id in agent_dict
664 agent_dict[object_id].remove(agent)
665
666
jadmanski0afbb632008-06-06 21:10:57 +0000667 def add_agent(self, agent):
668 self._agents.append(agent)
669 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000670 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
671 self._register_agent_for_ids(self._queue_entry_agents,
672 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000673
showard170873e2009-01-07 00:22:26 +0000674
675 def get_agents_for_entry(self, queue_entry):
676 """
677 Find agents corresponding to the specified queue_entry.
678 """
showardd3dc1992009-04-22 21:01:40 +0000679 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000680
681
682 def host_has_agent(self, host):
683 """
684 Determine if there is currently an Agent present using this host.
685 """
686 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000687
688
jadmanski0afbb632008-06-06 21:10:57 +0000689 def remove_agent(self, agent):
690 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000691 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
692 agent)
693 self._unregister_agent_for_ids(self._queue_entry_agents,
694 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000695
696
jadmanski0afbb632008-06-06 21:10:57 +0000697 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000698 self._register_pidfiles()
699 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000700 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000701 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000702 self._reverify_remaining_hosts()
703 # reinitialize drones after killing orphaned processes, since they can
704 # leave around files when they die
705 _drone_manager.execute_actions()
706 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000707
showard170873e2009-01-07 00:22:26 +0000708
709 def _register_pidfiles(self):
710 # during recovery we may need to read pidfiles for both running and
711 # parsing entries
712 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000713 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000714 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000715 for pidfile_name in _ALL_PIDFILE_NAMES:
716 pidfile_id = _drone_manager.get_pidfile_id_from(
717 queue_entry.execution_tag(), pidfile_name=pidfile_name)
718 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000719
720
showardd3dc1992009-04-22 21:01:40 +0000721 def _recover_entries_with_status(self, status, orphans, pidfile_name,
722 recover_entries_fn):
723 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000724 for queue_entry in queue_entries:
725 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000726 # synchronous job we've already recovered
727 continue
showardd3dc1992009-04-22 21:01:40 +0000728 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000729 execution_tag = queue_entry.execution_tag()
730 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000731 run_monitor.attach_to_existing_process(execution_tag,
732 pidfile_name=pidfile_name)
showard597bfd32009-05-08 18:22:50 +0000733
734 log_message = ('Recovering %s entry %s ' %
735 (status.lower(),
736 ', '.join(str(entry) for entry in queue_entries)))
showard170873e2009-01-07 00:22:26 +0000737 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000738 # execution apparently never happened
showard597bfd32009-05-08 18:22:50 +0000739 logging.info(log_message + 'without process')
showardd3dc1992009-04-22 21:01:40 +0000740 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000741 continue
mbligh90a549d2008-03-25 23:52:34 +0000742
showard597bfd32009-05-08 18:22:50 +0000743 logging.info(log_message + '(process %s)',
showardd3dc1992009-04-22 21:01:40 +0000744 run_monitor.get_process())
745 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
746 orphans.discard(run_monitor.get_process())
747
748
749 def _kill_remaining_orphan_processes(self, orphans):
750 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000751 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000752 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000753
showard170873e2009-01-07 00:22:26 +0000754
showardd3dc1992009-04-22 21:01:40 +0000755 def _recover_running_entries(self, orphans):
756 def recover_entries(job, queue_entries, run_monitor):
757 if run_monitor is not None:
758 queue_task = RecoveryQueueTask(job=job,
759 queue_entries=queue_entries,
760 run_monitor=run_monitor)
761 self.add_agent(Agent(tasks=[queue_task],
762 num_processes=len(queue_entries)))
763 # else, _requeue_other_active_entries will cover this
764
765 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
766 orphans, '.autoserv_execute',
767 recover_entries)
768
769
770 def _recover_gathering_entries(self, orphans):
771 def recover_entries(job, queue_entries, run_monitor):
772 gather_task = GatherLogsTask(job, queue_entries,
773 run_monitor=run_monitor)
774 self.add_agent(Agent([gather_task]))
775
776 self._recover_entries_with_status(
777 models.HostQueueEntry.Status.GATHERING,
778 orphans, _CRASHINFO_PID_FILE, recover_entries)
779
780
781 def _recover_parsing_entries(self, orphans):
782 def recover_entries(job, queue_entries, run_monitor):
783 reparse_task = FinalReparseTask(queue_entries,
784 run_monitor=run_monitor)
785 self.add_agent(Agent([reparse_task], num_processes=0))
786
787 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
788 orphans, _PARSER_PID_FILE,
789 recover_entries)
790
791
792 def _recover_all_recoverable_entries(self):
793 orphans = _drone_manager.get_orphaned_autoserv_processes()
794 self._recover_running_entries(orphans)
795 self._recover_gathering_entries(orphans)
796 self._recover_parsing_entries(orphans)
797 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000798
showard97aed502008-11-04 02:01:24 +0000799
showard170873e2009-01-07 00:22:26 +0000800 def _requeue_other_active_entries(self):
801 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000802 where='active AND NOT complete AND '
803 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000804 for queue_entry in queue_entries:
805 if self.get_agents_for_entry(queue_entry):
806 # entry has already been recovered
807 continue
showardd3dc1992009-04-22 21:01:40 +0000808 if queue_entry.aborted:
809 queue_entry.abort(self)
810 continue
811
812 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000813 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000814 if queue_entry.host:
815 tasks = queue_entry.host.reverify_tasks()
816 self.add_agent(Agent(tasks))
817 agent = queue_entry.requeue()
818
819
showard1ff7b2e2009-05-15 23:17:18 +0000820 def _find_reverify(self):
showard6d7b2ff2009-06-10 00:16:47 +0000821 tasks = models.SpecialTask.objects.filter(
822 task=models.SpecialTask.Task.REVERIFY, is_active=False,
823 is_complete=False)
824
825 host_ids = [str(task.host.id) for task in tasks]
826
827 if host_ids:
828 where = 'id IN (%s)' % ','.join(host_ids)
829 host_ids_reverifying = self._reverify_hosts_where(
830 where, cleanup=False)
831 tasks = tasks.filter(host__id__in=host_ids_reverifying)
832 for task in tasks:
833 task.is_active=True
834 task.save()
showard1ff7b2e2009-05-15 23:17:18 +0000835
836
showard170873e2009-01-07 00:22:26 +0000837 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000838 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000839 self._reverify_hosts_where("""(status = 'Repairing' OR
840 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000841 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000842
showard170873e2009-01-07 00:22:26 +0000843 # recover "Running" hosts with no active queue entries, although this
844 # should never happen
845 message = ('Recovering running host %s - this probably indicates a '
846 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000847 self._reverify_hosts_where("""status = 'Running' AND
848 id NOT IN (SELECT host_id
849 FROM host_queue_entries
850 WHERE active)""",
851 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000852
853
jadmanski0afbb632008-06-06 21:10:57 +0000854 def _reverify_hosts_where(self, where,
showarda64e52a2009-06-08 23:24:08 +0000855 print_message='Reverifying host %s',
856 cleanup=True):
showard170873e2009-01-07 00:22:26 +0000857 full_where='locked = 0 AND invalid = 0 AND ' + where
showard6d7b2ff2009-06-10 00:16:47 +0000858 host_ids_reverifying = []
showard170873e2009-01-07 00:22:26 +0000859 for host in Host.fetch(where=full_where):
860 if self.host_has_agent(host):
861 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000862 continue
showard170873e2009-01-07 00:22:26 +0000863 if print_message:
showardb18134f2009-03-20 20:52:18 +0000864 logging.info(print_message, host.hostname)
showarda64e52a2009-06-08 23:24:08 +0000865 tasks = host.reverify_tasks(cleanup)
showard170873e2009-01-07 00:22:26 +0000866 self.add_agent(Agent(tasks))
showard6d7b2ff2009-06-10 00:16:47 +0000867 host_ids_reverifying.append(host.id)
868 return host_ids_reverifying
mbligh36768f02008-02-22 18:28:33 +0000869
870
jadmanski0afbb632008-06-06 21:10:57 +0000871 def _recover_hosts(self):
872 # recover "Repair Failed" hosts
873 message = 'Reverifying dead host %s'
874 self._reverify_hosts_where("status = 'Repair Failed'",
875 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000876
877
showard04c82c52008-05-29 19:38:12 +0000878
showardb95b1bd2008-08-15 18:11:04 +0000879 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000880 # prioritize by job priority, then non-metahost over metahost, then FIFO
881 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000882 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000883 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000884 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000885
886
showard89f84db2009-03-12 20:39:13 +0000887 def _refresh_pending_queue_entries(self):
888 """
889 Lookup the pending HostQueueEntries and call our HostScheduler
890 refresh() method given that list. Return the list.
891
892 @returns A list of pending HostQueueEntries sorted in priority order.
893 """
showard63a34772008-08-18 19:32:50 +0000894 queue_entries = self._get_pending_queue_entries()
895 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000896 return []
showardb95b1bd2008-08-15 18:11:04 +0000897
showard63a34772008-08-18 19:32:50 +0000898 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000899
showard89f84db2009-03-12 20:39:13 +0000900 return queue_entries
901
902
903 def _schedule_atomic_group(self, queue_entry):
904 """
905 Schedule the given queue_entry on an atomic group of hosts.
906
907 Returns immediately if there are insufficient available hosts.
908
909 Creates new HostQueueEntries based off of queue_entry for the
910 scheduled hosts and starts them all running.
911 """
912 # This is a virtual host queue entry representing an entire
913 # atomic group, find a group and schedule their hosts.
914 group_hosts = self._host_scheduler.find_eligible_atomic_group(
915 queue_entry)
916 if not group_hosts:
917 return
918 # The first assigned host uses the original HostQueueEntry
919 group_queue_entries = [queue_entry]
920 for assigned_host in group_hosts[1:]:
921 # Create a new HQE for every additional assigned_host.
922 new_hqe = HostQueueEntry.clone(queue_entry)
923 new_hqe.save()
924 group_queue_entries.append(new_hqe)
925 assert len(group_queue_entries) == len(group_hosts)
926 for queue_entry, host in itertools.izip(group_queue_entries,
927 group_hosts):
928 self._run_queue_entry(queue_entry, host)
929
930
931 def _schedule_new_jobs(self):
932 queue_entries = self._refresh_pending_queue_entries()
933 if not queue_entries:
934 return
935
showard63a34772008-08-18 19:32:50 +0000936 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000937 if (queue_entry.atomic_group_id is None or
938 queue_entry.host_id is not None):
939 assigned_host = self._host_scheduler.find_eligible_host(
940 queue_entry)
941 if assigned_host:
942 self._run_queue_entry(queue_entry, assigned_host)
943 else:
944 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000945
946
947 def _run_queue_entry(self, queue_entry, host):
showard77182562009-06-10 00:16:05 +0000948 agent = queue_entry.run_pre_job_tasks(assigned_host=host)
949 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000950
951
jadmanski0afbb632008-06-06 21:10:57 +0000952 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000953 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
954 for agent in self.get_agents_for_entry(entry):
955 agent.abort()
956 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000957
958
showard324bf812009-01-20 23:23:38 +0000959 def _can_start_agent(self, agent, num_started_this_cycle,
960 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000961 # always allow zero-process agents to run
962 if agent.num_processes == 0:
963 return True
964 # don't allow any nonzero-process agents to run after we've reached a
965 # limit (this avoids starvation of many-process agents)
966 if have_reached_limit:
967 return False
968 # total process throttling
showard324bf812009-01-20 23:23:38 +0000969 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000970 return False
971 # if a single agent exceeds the per-cycle throttling, still allow it to
972 # run when it's the first agent in the cycle
973 if num_started_this_cycle == 0:
974 return True
975 # per-cycle throttling
976 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000977 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000978 return False
979 return True
980
981
jadmanski0afbb632008-06-06 21:10:57 +0000982 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000983 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000984 have_reached_limit = False
985 # iterate over copy, so we can remove agents during iteration
986 for agent in list(self._agents):
987 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000988 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000989 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000990 continue
991 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000992 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000993 have_reached_limit):
994 have_reached_limit = True
995 continue
showard4c5374f2008-09-04 17:02:56 +0000996 num_started_this_cycle += agent.num_processes
997 agent.tick()
showarda9435c02009-05-13 21:28:17 +0000998 logging.info('%d running processes',
showardb18134f2009-03-20 20:52:18 +0000999 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +00001000
1001
showard29f7cd22009-04-29 21:16:24 +00001002 def _process_recurring_runs(self):
1003 recurring_runs = models.RecurringRun.objects.filter(
1004 start_date__lte=datetime.datetime.now())
1005 for rrun in recurring_runs:
1006 # Create job from template
1007 job = rrun.job
1008 info = rpc_utils.get_job_info(job)
showarda9435c02009-05-13 21:28:17 +00001009 options = job.get_object_dict()
showard29f7cd22009-04-29 21:16:24 +00001010
1011 host_objects = info['hosts']
1012 one_time_hosts = info['one_time_hosts']
1013 metahost_objects = info['meta_hosts']
1014 dependencies = info['dependencies']
1015 atomic_group = info['atomic_group']
1016
1017 for host in one_time_hosts or []:
1018 this_host = models.Host.create_one_time_host(host.hostname)
1019 host_objects.append(this_host)
1020
1021 try:
1022 rpc_utils.create_new_job(owner=rrun.owner.login,
showarda9435c02009-05-13 21:28:17 +00001023 options=options,
showard29f7cd22009-04-29 21:16:24 +00001024 host_objects=host_objects,
1025 metahost_objects=metahost_objects,
showard29f7cd22009-04-29 21:16:24 +00001026 atomic_group=atomic_group)
1027
1028 except Exception, ex:
1029 logging.exception(ex)
1030 #TODO send email
1031
1032 if rrun.loop_count == 1:
1033 rrun.delete()
1034 else:
1035 if rrun.loop_count != 0: # if not infinite loop
1036 # calculate new start_date
1037 difference = datetime.timedelta(seconds=rrun.loop_period)
1038 rrun.start_date = rrun.start_date + difference
1039 rrun.loop_count -= 1
1040 rrun.save()
1041
1042
showard170873e2009-01-07 00:22:26 +00001043class PidfileRunMonitor(object):
1044 """
1045 Client must call either run() to start a new process or
1046 attach_to_existing_process().
1047 """
mbligh36768f02008-02-22 18:28:33 +00001048
showard170873e2009-01-07 00:22:26 +00001049 class _PidfileException(Exception):
1050 """
1051 Raised when there's some unexpected behavior with the pid file, but only
1052 used internally (never allowed to escape this class).
1053 """
mbligh36768f02008-02-22 18:28:33 +00001054
1055
showard170873e2009-01-07 00:22:26 +00001056 def __init__(self):
showard35162b02009-03-03 02:17:30 +00001057 self.lost_process = False
showard170873e2009-01-07 00:22:26 +00001058 self._start_time = None
1059 self.pidfile_id = None
1060 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +00001061
1062
showard170873e2009-01-07 00:22:26 +00001063 def _add_nice_command(self, command, nice_level):
1064 if not nice_level:
1065 return command
1066 return ['nice', '-n', str(nice_level)] + command
1067
1068
1069 def _set_start_time(self):
1070 self._start_time = time.time()
1071
1072
1073 def run(self, command, working_directory, nice_level=None, log_file=None,
1074 pidfile_name=None, paired_with_pidfile=None):
1075 assert command is not None
1076 if nice_level is not None:
1077 command = ['nice', '-n', str(nice_level)] + command
1078 self._set_start_time()
1079 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +00001080 command, working_directory, pidfile_name=pidfile_name,
1081 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +00001082
1083
showardd3dc1992009-04-22 21:01:40 +00001084 def attach_to_existing_process(self, execution_tag,
1085 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +00001086 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +00001087 self.pidfile_id = _drone_manager.get_pidfile_id_from(
1088 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +00001089 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001090
1091
jadmanski0afbb632008-06-06 21:10:57 +00001092 def kill(self):
showard170873e2009-01-07 00:22:26 +00001093 if self.has_process():
1094 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001095
mbligh36768f02008-02-22 18:28:33 +00001096
showard170873e2009-01-07 00:22:26 +00001097 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001098 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001099 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001100
1101
showard170873e2009-01-07 00:22:26 +00001102 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001103 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001104 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001105 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001106
1107
showard170873e2009-01-07 00:22:26 +00001108 def _read_pidfile(self, use_second_read=False):
1109 assert self.pidfile_id is not None, (
1110 'You must call run() or attach_to_existing_process()')
1111 contents = _drone_manager.get_pidfile_contents(
1112 self.pidfile_id, use_second_read=use_second_read)
1113 if contents.is_invalid():
1114 self._state = drone_manager.PidfileContents()
1115 raise self._PidfileException(contents)
1116 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001117
1118
showard21baa452008-10-21 00:08:39 +00001119 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001120 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1121 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001122 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001123 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001124 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001125
1126
1127 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001128 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001129 return
mblighbb421852008-03-11 22:36:16 +00001130
showard21baa452008-10-21 00:08:39 +00001131 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001132
showard170873e2009-01-07 00:22:26 +00001133 if self._state.process is None:
1134 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001135 return
mbligh90a549d2008-03-25 23:52:34 +00001136
showard21baa452008-10-21 00:08:39 +00001137 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001138 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001139 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001140 return
mbligh90a549d2008-03-25 23:52:34 +00001141
showard170873e2009-01-07 00:22:26 +00001142 # pid but no running process - maybe process *just* exited
1143 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001144 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001145 # autoserv exited without writing an exit code
1146 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001147 self._handle_pidfile_error(
1148 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001149
showard21baa452008-10-21 00:08:39 +00001150
1151 def _get_pidfile_info(self):
1152 """\
1153 After completion, self._state will contain:
1154 pid=None, exit_status=None if autoserv has not yet run
1155 pid!=None, exit_status=None if autoserv is running
1156 pid!=None, exit_status!=None if autoserv has completed
1157 """
1158 try:
1159 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001160 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001161 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001162
1163
showard170873e2009-01-07 00:22:26 +00001164 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001165 """\
1166 Called when no pidfile is found or no pid is in the pidfile.
1167 """
showard170873e2009-01-07 00:22:26 +00001168 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001169 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001170 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1171 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001172 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001173 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001174
1175
showard35162b02009-03-03 02:17:30 +00001176 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001177 """\
1178 Called when autoserv has exited without writing an exit status,
1179 or we've timed out waiting for autoserv to write a pid to the
1180 pidfile. In either case, we just return failure and the caller
1181 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001182
showard170873e2009-01-07 00:22:26 +00001183 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001184 """
1185 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001186 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001187 self._state.exit_status = 1
1188 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001189
1190
jadmanski0afbb632008-06-06 21:10:57 +00001191 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001192 self._get_pidfile_info()
1193 return self._state.exit_status
1194
1195
1196 def num_tests_failed(self):
1197 self._get_pidfile_info()
1198 assert self._state.num_tests_failed is not None
1199 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001200
1201
mbligh36768f02008-02-22 18:28:33 +00001202class Agent(object):
showard77182562009-06-10 00:16:05 +00001203 """
1204 An agent for use by the Dispatcher class to perform a sequence of tasks.
1205
1206 The following methods are required on all task objects:
1207 poll() - Called periodically to let the task check its status and
1208 update its internal state. If the task succeeded.
1209 is_done() - Returns True if the task is finished.
1210 abort() - Called when an abort has been requested. The task must
1211 set its aborted attribute to True if it actually aborted.
1212
1213 The following attributes are required on all task objects:
1214 aborted - bool, True if this task was aborted.
1215 failure_tasks - A sequence of tasks to be run using a new Agent
1216 by the dispatcher should this task fail.
1217 success - bool, True if this task succeeded.
1218 queue_entry_ids - A sequence of HostQueueEntry ids this task handles.
1219 host_ids - A sequence of Host ids this task represents.
1220
1221 The following attribute is written to all task objects:
1222 agent - A reference to the Agent instance that the task has been
1223 added to.
1224 """
1225
1226
showard170873e2009-01-07 00:22:26 +00001227 def __init__(self, tasks, num_processes=1):
showard77182562009-06-10 00:16:05 +00001228 """
1229 @param tasks: A list of tasks as described in the class docstring.
1230 @param num_processes: The number of subprocesses the Agent represents.
1231 This is used by the Dispatcher for managing the load on the
1232 system. Defaults to 1.
1233 """
jadmanski0afbb632008-06-06 21:10:57 +00001234 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001235 self.queue = None
showard77182562009-06-10 00:16:05 +00001236 # This is filled in by Dispatcher.add_agent()
jadmanski0afbb632008-06-06 21:10:57 +00001237 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001238 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001239
showard170873e2009-01-07 00:22:26 +00001240 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1241 for task in tasks)
1242 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1243
showardd3dc1992009-04-22 21:01:40 +00001244 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001245 for task in tasks:
1246 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001247
1248
showardd3dc1992009-04-22 21:01:40 +00001249 def _clear_queue(self):
1250 self.queue = Queue.Queue(0)
1251
1252
showard170873e2009-01-07 00:22:26 +00001253 def _union_ids(self, id_lists):
1254 return set(itertools.chain(*id_lists))
1255
1256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def add_task(self, task):
1258 self.queue.put_nowait(task)
1259 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001260
1261
jadmanski0afbb632008-06-06 21:10:57 +00001262 def tick(self):
showard21baa452008-10-21 00:08:39 +00001263 while not self.is_done():
showard08a36412009-05-05 01:01:13 +00001264 if self.active_task:
showard21baa452008-10-21 00:08:39 +00001265 self.active_task.poll()
1266 if not self.active_task.is_done():
1267 return
1268 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001269
1270
jadmanski0afbb632008-06-06 21:10:57 +00001271 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001272 logging.info("agent picking task")
showard08a36412009-05-05 01:01:13 +00001273 if self.active_task is not None:
jadmanski0afbb632008-06-06 21:10:57 +00001274 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001275 if not self.active_task.success:
1276 self.on_task_failure()
showard08a36412009-05-05 01:01:13 +00001277 self.active_task = None
mblighe2586682008-02-29 22:45:46 +00001278
jadmanski0afbb632008-06-06 21:10:57 +00001279 if not self.is_done():
1280 self.active_task = self.queue.get_nowait()
mbligh36768f02008-02-22 18:28:33 +00001281
1282
jadmanski0afbb632008-06-06 21:10:57 +00001283 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001284 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001285 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1286 # get reset.
1287 new_agent = Agent(self.active_task.failure_tasks)
1288 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001289
mblighe2586682008-02-29 22:45:46 +00001290
showard4c5374f2008-09-04 17:02:56 +00001291 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001292 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001293
1294
jadmanski0afbb632008-06-06 21:10:57 +00001295 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001296 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001297
1298
showardd3dc1992009-04-22 21:01:40 +00001299 def abort(self):
showard08a36412009-05-05 01:01:13 +00001300 # abort tasks until the queue is empty or a task ignores the abort
1301 while not self.is_done():
1302 if not self.active_task:
1303 self._next_task()
showardd3dc1992009-04-22 21:01:40 +00001304 self.active_task.abort()
showard08a36412009-05-05 01:01:13 +00001305 if not self.active_task.aborted:
1306 # tasks can choose to ignore aborts
showard20f9bdd2009-04-29 19:48:33 +00001307 return
1308 self.active_task = None
1309
showardd3dc1992009-04-22 21:01:40 +00001310
showard77182562009-06-10 00:16:05 +00001311class DelayedCallTask(object):
1312 """
1313 A task object like AgentTask for an Agent to run that waits for the
1314 specified amount of time to have elapsed before calling the supplied
1315 callback once and finishing. If the callback returns anything, it is
1316 assumed to be a new Agent instance and will be added to the dispatcher.
1317
1318 @attribute end_time: The absolute posix time after which this task will
1319 call its callback when it is polled and be finished.
1320
1321 Also has all attributes required by the Agent class.
1322 """
1323 def __init__(self, delay_seconds, callback, now_func=None):
1324 """
1325 @param delay_seconds: The delay in seconds from now that this task
1326 will call the supplied callback and be done.
1327 @param callback: A callable to be called by this task once after at
1328 least delay_seconds time has elapsed. It must return None
1329 or a new Agent instance.
1330 @param now_func: A time.time like function. Default: time.time.
1331 Used for testing.
1332 """
1333 assert delay_seconds > 0
1334 assert callable(callback)
1335 if not now_func:
1336 now_func = time.time
1337 self._now_func = now_func
1338 self._callback = callback
1339
1340 self.end_time = self._now_func() + delay_seconds
1341
1342 # These attributes are required by Agent.
1343 self.aborted = False
1344 self.failure_tasks = ()
1345 self.host_ids = ()
1346 self.success = False
1347 self.queue_entry_ids = ()
1348 # This is filled in by Agent.add_task().
1349 self.agent = None
1350
1351
1352 def poll(self):
1353 if self._callback and self._now_func() >= self.end_time:
1354 new_agent = self._callback()
1355 if new_agent:
1356 self.agent.dispatcher.add_agent(new_agent)
1357 self._callback = None
1358 self.success = True
1359
1360
1361 def is_done(self):
1362 return not self._callback
1363
1364
1365 def abort(self):
1366 self.aborted = True
1367 self._callback = None
1368
1369
mbligh36768f02008-02-22 18:28:33 +00001370class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001371 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1372 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001373 self.done = False
1374 self.failure_tasks = failure_tasks
1375 self.started = False
1376 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001377 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001378 self.task = None
1379 self.agent = None
1380 self.monitor = None
1381 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001382 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001383 self.queue_entry_ids = []
1384 self.host_ids = []
1385 self.log_file = None
1386
1387
1388 def _set_ids(self, host=None, queue_entries=None):
1389 if queue_entries and queue_entries != [None]:
1390 self.host_ids = [entry.host.id for entry in queue_entries]
1391 self.queue_entry_ids = [entry.id for entry in queue_entries]
1392 else:
1393 assert host
1394 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001395
1396
jadmanski0afbb632008-06-06 21:10:57 +00001397 def poll(self):
showard08a36412009-05-05 01:01:13 +00001398 if not self.started:
1399 self.start()
1400 self.tick()
1401
1402
1403 def tick(self):
jadmanski0afbb632008-06-06 21:10:57 +00001404 if self.monitor:
showard08a36412009-05-05 01:01:13 +00001405 exit_code = self.monitor.exit_code()
1406 if exit_code is None:
1407 return
1408 success = (exit_code == 0)
jadmanski0afbb632008-06-06 21:10:57 +00001409 else:
1410 success = False
mbligh36768f02008-02-22 18:28:33 +00001411
jadmanski0afbb632008-06-06 21:10:57 +00001412 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001413
1414
jadmanski0afbb632008-06-06 21:10:57 +00001415 def is_done(self):
1416 return self.done
mbligh36768f02008-02-22 18:28:33 +00001417
1418
jadmanski0afbb632008-06-06 21:10:57 +00001419 def finished(self, success):
showard08a36412009-05-05 01:01:13 +00001420 if self.done:
1421 return
jadmanski0afbb632008-06-06 21:10:57 +00001422 self.done = True
1423 self.success = success
1424 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001425
1426
jadmanski0afbb632008-06-06 21:10:57 +00001427 def prolog(self):
1428 pass
mblighd64e5702008-04-04 21:39:28 +00001429
1430
jadmanski0afbb632008-06-06 21:10:57 +00001431 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001432 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001433
mbligh36768f02008-02-22 18:28:33 +00001434
jadmanski0afbb632008-06-06 21:10:57 +00001435 def cleanup(self):
showard6b733412009-04-27 20:09:18 +00001436 if self.monitor and self.monitor.has_process() and self.log_file:
showard170873e2009-01-07 00:22:26 +00001437 _drone_manager.copy_to_results_repository(
1438 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001439
1440
jadmanski0afbb632008-06-06 21:10:57 +00001441 def epilog(self):
1442 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001443
1444
jadmanski0afbb632008-06-06 21:10:57 +00001445 def start(self):
1446 assert self.agent
1447
1448 if not self.started:
1449 self.prolog()
1450 self.run()
1451
1452 self.started = True
1453
1454
1455 def abort(self):
1456 if self.monitor:
1457 self.monitor.kill()
1458 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001459 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001460 self.cleanup()
1461
1462
showard170873e2009-01-07 00:22:26 +00001463 def set_host_log_file(self, base_name, host):
1464 filename = '%s.%s' % (time.time(), base_name)
1465 self.log_file = os.path.join('hosts', host.hostname, filename)
1466
1467
showardde634ee2009-01-30 01:44:24 +00001468 def _get_consistent_execution_tag(self, queue_entries):
1469 first_execution_tag = queue_entries[0].execution_tag()
1470 for queue_entry in queue_entries[1:]:
1471 assert queue_entry.execution_tag() == first_execution_tag, (
1472 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1473 queue_entry,
1474 first_execution_tag,
1475 queue_entries[0]))
1476 return first_execution_tag
1477
1478
showarda1e74b32009-05-12 17:32:04 +00001479 def _copy_results(self, queue_entries, use_monitor=None):
showardde634ee2009-01-30 01:44:24 +00001480 assert len(queue_entries) > 0
showard6b733412009-04-27 20:09:18 +00001481 if use_monitor is None:
1482 assert self.monitor
1483 use_monitor = self.monitor
1484 assert use_monitor.has_process()
showardde634ee2009-01-30 01:44:24 +00001485 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001486 results_path = execution_tag + '/'
showard6b733412009-04-27 20:09:18 +00001487 _drone_manager.copy_to_results_repository(use_monitor.get_process(),
showard678df4f2009-02-04 21:36:39 +00001488 results_path)
showardde634ee2009-01-30 01:44:24 +00001489
showarda1e74b32009-05-12 17:32:04 +00001490
1491 def _parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001492 reparse_task = FinalReparseTask(queue_entries)
1493 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1494
1495
showarda1e74b32009-05-12 17:32:04 +00001496 def _copy_and_parse_results(self, queue_entries, use_monitor=None):
1497 self._copy_results(queue_entries, use_monitor)
1498 self._parse_results(queue_entries)
1499
1500
showardd3dc1992009-04-22 21:01:40 +00001501 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001502 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001503 self.monitor = PidfileRunMonitor()
1504 self.monitor.run(self.cmd, self._working_directory,
1505 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001506 log_file=self.log_file,
1507 pidfile_name=pidfile_name,
1508 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001509
1510
showardd9205182009-04-27 20:09:55 +00001511class TaskWithJobKeyvals(object):
1512 """AgentTask mixin providing functionality to help with job keyval files."""
1513 _KEYVAL_FILE = 'keyval'
1514 def _format_keyval(self, key, value):
1515 return '%s=%s' % (key, value)
1516
1517
1518 def _keyval_path(self):
1519 """Subclasses must override this"""
1520 raise NotImplemented
1521
1522
1523 def _write_keyval_after_job(self, field, value):
1524 assert self.monitor
1525 if not self.monitor.has_process():
1526 return
1527 _drone_manager.write_lines_to_file(
1528 self._keyval_path(), [self._format_keyval(field, value)],
1529 paired_with_process=self.monitor.get_process())
1530
1531
1532 def _job_queued_keyval(self, job):
1533 return 'job_queued', int(time.mktime(job.created_on.timetuple()))
1534
1535
1536 def _write_job_finished(self):
1537 self._write_keyval_after_job("job_finished", int(time.time()))
1538
1539
1540class RepairTask(AgentTask, TaskWithJobKeyvals):
showarde788ea62008-11-17 21:02:47 +00001541 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001542 """\
showard170873e2009-01-07 00:22:26 +00001543 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001544 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001545 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001546 # normalize the protection name
1547 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001548
jadmanski0afbb632008-06-06 21:10:57 +00001549 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001550 self.queue_entry_to_fail = queue_entry
1551 # *don't* include the queue entry in IDs -- if the queue entry is
1552 # aborted, we want to leave the repair task running
1553 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001554
1555 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001556 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1557 ['-R', '--host-protection', protection],
1558 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001559 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1560
showard170873e2009-01-07 00:22:26 +00001561 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001562
mbligh36768f02008-02-22 18:28:33 +00001563
jadmanski0afbb632008-06-06 21:10:57 +00001564 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001565 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001566 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001567 if self.queue_entry_to_fail:
1568 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001569
1570
showardd9205182009-04-27 20:09:55 +00001571 def _keyval_path(self):
1572 return os.path.join(self.temp_results_dir, self._KEYVAL_FILE)
1573
1574
showardde634ee2009-01-30 01:44:24 +00001575 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001576 assert self.queue_entry_to_fail
1577
1578 if self.queue_entry_to_fail.meta_host:
1579 return # don't fail metahost entries, they'll be reassigned
1580
1581 self.queue_entry_to_fail.update_from_database()
1582 if self.queue_entry_to_fail.status != 'Queued':
1583 return # entry has been aborted
1584
1585 self.queue_entry_to_fail.set_execution_subdir()
showardd9205182009-04-27 20:09:55 +00001586 queued_key, queued_time = self._job_queued_keyval(
1587 self.queue_entry_to_fail.job)
1588 self._write_keyval_after_job(queued_key, queued_time)
1589 self._write_job_finished()
showard678df4f2009-02-04 21:36:39 +00001590 # copy results logs into the normal place for job results
1591 _drone_manager.copy_results_on_drone(
1592 self.monitor.get_process(),
1593 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001594 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001595
showarda1e74b32009-05-12 17:32:04 +00001596 self._copy_results([self.queue_entry_to_fail])
1597 if self.queue_entry_to_fail.job.parse_failed_repair:
1598 self._parse_results([self.queue_entry_to_fail])
showardccbd6c52009-03-21 00:10:21 +00001599 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001600
1601
jadmanski0afbb632008-06-06 21:10:57 +00001602 def epilog(self):
1603 super(RepairTask, self).epilog()
showard6d7b2ff2009-06-10 00:16:47 +00001604
1605 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1606 is_active=True)
1607 for task in tasks:
1608 task.is_complete = True
1609 task.save()
1610
jadmanski0afbb632008-06-06 21:10:57 +00001611 if self.success:
1612 self.host.set_status('Ready')
1613 else:
1614 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001615 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001616 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001617
1618
showard8fe93b52008-11-18 17:53:22 +00001619class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001620 def epilog(self):
1621 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001622 should_copy_results = (self.queue_entry and not self.success
1623 and not self.queue_entry.meta_host)
1624 if should_copy_results:
1625 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001626 destination = os.path.join(self.queue_entry.execution_tag(),
1627 os.path.basename(self.log_file))
1628 _drone_manager.copy_to_results_repository(
1629 self.monitor.get_process(), self.log_file,
1630 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001631
1632
1633class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001634 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001635 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001636 self.host = host or queue_entry.host
1637 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001638
jadmanski0afbb632008-06-06 21:10:57 +00001639 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001640 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1641 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001642 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001643 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1644 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001645
showard170873e2009-01-07 00:22:26 +00001646 self.set_host_log_file('verify', self.host)
1647 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001648
1649
jadmanski0afbb632008-06-06 21:10:57 +00001650 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001651 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001652 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001653 if self.queue_entry:
1654 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001655 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001656
1657
jadmanski0afbb632008-06-06 21:10:57 +00001658 def epilog(self):
1659 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001660
jadmanski0afbb632008-06-06 21:10:57 +00001661 if self.success:
showard6d7b2ff2009-06-10 00:16:47 +00001662 tasks = models.SpecialTask.objects.filter(host__id=self.host.id,
1663 is_active=True)
1664 for task in tasks:
1665 task.is_complete=True
1666 task.save()
1667
jadmanski0afbb632008-06-06 21:10:57 +00001668 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001669
1670
showardd9205182009-04-27 20:09:55 +00001671class QueueTask(AgentTask, TaskWithJobKeyvals):
showardf1ae3542009-05-11 19:26:02 +00001672 def __init__(self, job, queue_entries, cmd, group_name=''):
jadmanski0afbb632008-06-06 21:10:57 +00001673 self.job = job
1674 self.queue_entries = queue_entries
showardf1ae3542009-05-11 19:26:02 +00001675 self.group_name = group_name
showard170873e2009-01-07 00:22:26 +00001676 super(QueueTask, self).__init__(cmd, self._execution_tag())
1677 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001678
1679
showard73ec0442009-02-07 02:05:20 +00001680 def _keyval_path(self):
showardd9205182009-04-27 20:09:55 +00001681 return os.path.join(self._execution_tag(), self._KEYVAL_FILE)
showard73ec0442009-02-07 02:05:20 +00001682
1683
1684 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1685 keyval_contents = '\n'.join(self._format_keyval(key, value)
1686 for key, value in keyval_dict.iteritems())
1687 # always end with a newline to allow additional keyvals to be written
1688 keyval_contents += '\n'
1689 _drone_manager.attach_file_to_execution(self._execution_tag(),
1690 keyval_contents,
1691 file_path=keyval_path)
1692
1693
1694 def _write_keyvals_before_job(self, keyval_dict):
1695 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1696
1697
showard170873e2009-01-07 00:22:26 +00001698 def _write_host_keyvals(self, host):
1699 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1700 host.hostname)
1701 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001702 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1703 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001704
1705
showard170873e2009-01-07 00:22:26 +00001706 def _execution_tag(self):
1707 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001708
1709
jadmanski0afbb632008-06-06 21:10:57 +00001710 def prolog(self):
showardd9205182009-04-27 20:09:55 +00001711 queued_key, queued_time = self._job_queued_keyval(self.job)
showardf1ae3542009-05-11 19:26:02 +00001712 keyval_dict = {queued_key: queued_time}
1713 if self.group_name:
1714 keyval_dict['host_group_name'] = self.group_name
1715 self._write_keyvals_before_job(keyval_dict)
jadmanski0afbb632008-06-06 21:10:57 +00001716 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001717 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001718 queue_entry.set_status('Running')
showard12f3e322009-05-13 21:27:42 +00001719 queue_entry.update_field('started_on', datetime.datetime.now())
jadmanski0afbb632008-06-06 21:10:57 +00001720 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001721 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001722 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001723 assert len(self.queue_entries) == 1
1724 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001725
1726
showard35162b02009-03-03 02:17:30 +00001727 def _write_lost_process_error_file(self):
1728 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1729 _drone_manager.write_lines_to_file(error_file_path,
1730 [_LOST_PROCESS_ERROR])
1731
1732
showardd3dc1992009-04-22 21:01:40 +00001733 def _finish_task(self):
showard08a36412009-05-05 01:01:13 +00001734 if not self.monitor:
1735 return
1736
showardd9205182009-04-27 20:09:55 +00001737 self._write_job_finished()
1738
showardd3dc1992009-04-22 21:01:40 +00001739 # both of these conditionals can be true, iff the process ran, wrote a
1740 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001741 if self.monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +00001742 gather_task = GatherLogsTask(self.job, self.queue_entries)
1743 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001744
1745 if self.monitor.lost_process:
1746 self._write_lost_process_error_file()
1747 for queue_entry in self.queue_entries:
1748 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001749
1750
showardcbd74612008-11-19 21:42:02 +00001751 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001752 _drone_manager.write_lines_to_file(
1753 os.path.join(self._execution_tag(), 'status.log'),
1754 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001755 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001756
1757
jadmanskif7fa2cc2008-10-01 14:13:23 +00001758 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001759 if not self.monitor or not self.monitor.has_process():
1760 return
1761
jadmanskif7fa2cc2008-10-01 14:13:23 +00001762 # build up sets of all the aborted_by and aborted_on values
1763 aborted_by, aborted_on = set(), set()
1764 for queue_entry in self.queue_entries:
1765 if queue_entry.aborted_by:
1766 aborted_by.add(queue_entry.aborted_by)
1767 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1768 aborted_on.add(t)
1769
1770 # extract some actual, unique aborted by value and write it out
1771 assert len(aborted_by) <= 1
1772 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001773 aborted_by_value = aborted_by.pop()
1774 aborted_on_value = max(aborted_on)
1775 else:
1776 aborted_by_value = 'autotest_system'
1777 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001778
showarda0382352009-02-11 23:36:43 +00001779 self._write_keyval_after_job("aborted_by", aborted_by_value)
1780 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001781
showardcbd74612008-11-19 21:42:02 +00001782 aborted_on_string = str(datetime.datetime.fromtimestamp(
1783 aborted_on_value))
1784 self._write_status_comment('Job aborted by %s on %s' %
1785 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001786
1787
jadmanski0afbb632008-06-06 21:10:57 +00001788 def abort(self):
1789 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001790 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001791 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001792
1793
jadmanski0afbb632008-06-06 21:10:57 +00001794 def epilog(self):
1795 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001796 self._finish_task()
1797 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001798
1799
mblighbb421852008-03-11 22:36:16 +00001800class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001801 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001802 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
showard5add1c82009-05-26 19:27:46 +00001803 self.monitor = run_monitor
1804 self.started = True
1805 # since we set started=True here, prolog() and run() shouldn't be called
mblighbb421852008-03-11 22:36:16 +00001806
1807
jadmanski0afbb632008-06-06 21:10:57 +00001808 def run(self):
showard5add1c82009-05-26 19:27:46 +00001809 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001810
1811
jadmanski0afbb632008-06-06 21:10:57 +00001812 def prolog(self):
showard5add1c82009-05-26 19:27:46 +00001813 raise NotImplemented('This should never be called')
mblighbb421852008-03-11 22:36:16 +00001814
1815
showardd3dc1992009-04-22 21:01:40 +00001816class PostJobTask(AgentTask):
1817 def __init__(self, queue_entries, pidfile_name, logfile_name,
1818 run_monitor=None):
1819 """
1820 If run_monitor != None, we're recovering a running task.
1821 """
1822 self._queue_entries = queue_entries
1823 self._pidfile_name = pidfile_name
showardd3dc1992009-04-22 21:01:40 +00001824
1825 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1826 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1827 self._autoserv_monitor = PidfileRunMonitor()
1828 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1829 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1830
1831 if _testing_mode:
1832 command = 'true'
1833 else:
1834 command = self._generate_command(self._results_dir)
1835
1836 super(PostJobTask, self).__init__(cmd=command,
1837 working_directory=self._execution_tag)
showard5add1c82009-05-26 19:27:46 +00001838 # this must happen *after* the super call
1839 self.monitor = run_monitor
1840 if run_monitor:
1841 self.started = True
showardd3dc1992009-04-22 21:01:40 +00001842
1843 self.log_file = os.path.join(self._execution_tag, logfile_name)
1844 self._final_status = self._determine_final_status()
1845
1846
1847 def _generate_command(self, results_dir):
1848 raise NotImplementedError('Subclasses must override this')
1849
1850
1851 def _job_was_aborted(self):
1852 was_aborted = None
1853 for queue_entry in self._queue_entries:
1854 queue_entry.update_from_database()
1855 if was_aborted is None: # first queue entry
1856 was_aborted = bool(queue_entry.aborted)
1857 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1858 email_manager.manager.enqueue_notify_email(
1859 'Inconsistent abort state',
1860 'Queue entries have inconsistent abort state: ' +
1861 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1862 # don't crash here, just assume true
1863 return True
1864 return was_aborted
1865
1866
1867 def _determine_final_status(self):
1868 if self._job_was_aborted():
1869 return models.HostQueueEntry.Status.ABORTED
1870
1871 # we'll use a PidfileRunMonitor to read the autoserv exit status
1872 if self._autoserv_monitor.exit_code() == 0:
1873 return models.HostQueueEntry.Status.COMPLETED
1874 return models.HostQueueEntry.Status.FAILED
1875
1876
1877 def run(self):
showard5add1c82009-05-26 19:27:46 +00001878 assert not self.monitor
showardd3dc1992009-04-22 21:01:40 +00001879
showard5add1c82009-05-26 19:27:46 +00001880 # make sure we actually have results to work with.
1881 # this should never happen in normal operation.
1882 if not self._autoserv_monitor.has_process():
1883 email_manager.manager.enqueue_notify_email(
1884 'No results in post-job task',
1885 'No results in post-job task at %s' %
1886 self._autoserv_monitor.pidfile_id)
1887 self.finished(False)
1888 return
1889
1890 super(PostJobTask, self).run(
1891 pidfile_name=self._pidfile_name,
1892 paired_with_pidfile=self._paired_with_pidfile)
showardd3dc1992009-04-22 21:01:40 +00001893
1894
1895 def _set_all_statuses(self, status):
1896 for queue_entry in self._queue_entries:
1897 queue_entry.set_status(status)
1898
1899
1900 def abort(self):
1901 # override AgentTask.abort() to avoid killing the process and ending
1902 # the task. post-job tasks continue when the job is aborted.
1903 pass
1904
1905
1906class GatherLogsTask(PostJobTask):
1907 """
1908 Task responsible for
1909 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1910 * copying logs to the results repository
1911 * spawning CleanupTasks for hosts, if necessary
1912 * spawning a FinalReparseTask for the job
1913 """
1914 def __init__(self, job, queue_entries, run_monitor=None):
1915 self._job = job
1916 super(GatherLogsTask, self).__init__(
1917 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1918 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1919 self._set_ids(queue_entries=queue_entries)
1920
1921
1922 def _generate_command(self, results_dir):
1923 host_list = ','.join(queue_entry.host.hostname
1924 for queue_entry in self._queue_entries)
1925 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1926 '-r', results_dir]
1927
1928
1929 def prolog(self):
1930 super(GatherLogsTask, self).prolog()
1931 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1932
1933
1934 def _reboot_hosts(self):
1935 reboot_after = self._job.reboot_after
1936 do_reboot = False
showard6b733412009-04-27 20:09:18 +00001937 if self._final_status == models.HostQueueEntry.Status.ABORTED:
1938 do_reboot = True
1939 elif reboot_after == models.RebootAfter.ALWAYS:
showardd3dc1992009-04-22 21:01:40 +00001940 do_reboot = True
1941 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1942 final_success = (
1943 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1944 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1945 do_reboot = (final_success and num_tests_failed == 0)
1946
1947 for queue_entry in self._queue_entries:
1948 if do_reboot:
1949 # don't pass the queue entry to the CleanupTask. if the cleanup
1950 # fails, the job doesn't care -- it's over.
1951 cleanup_task = CleanupTask(host=queue_entry.host)
1952 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1953 else:
1954 queue_entry.host.set_status('Ready')
1955
1956
1957 def epilog(self):
1958 super(GatherLogsTask, self).epilog()
showardebc0fb72009-05-13 21:28:07 +00001959 if self._autoserv_monitor.has_process():
1960 self._copy_and_parse_results(self._queue_entries,
1961 use_monitor=self._autoserv_monitor)
showardd3dc1992009-04-22 21:01:40 +00001962 self._reboot_hosts()
1963
1964
showard0bbfc212009-04-29 21:06:13 +00001965 def run(self):
showard597bfd32009-05-08 18:22:50 +00001966 autoserv_exit_code = self._autoserv_monitor.exit_code()
1967 # only run if Autoserv exited due to some signal. if we have no exit
1968 # code, assume something bad (and signal-like) happened.
1969 if autoserv_exit_code is None or os.WIFSIGNALED(autoserv_exit_code):
showard0bbfc212009-04-29 21:06:13 +00001970 super(GatherLogsTask, self).run()
showard597bfd32009-05-08 18:22:50 +00001971 else:
1972 self.finished(True)
showard0bbfc212009-04-29 21:06:13 +00001973
1974
showard8fe93b52008-11-18 17:53:22 +00001975class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001976 def __init__(self, host=None, queue_entry=None):
1977 assert bool(host) ^ bool(queue_entry)
1978 if queue_entry:
1979 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001980 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001981 self.host = host
showard170873e2009-01-07 00:22:26 +00001982
1983 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001984 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1985 ['--cleanup'],
1986 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001987 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001988 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1989 failure_tasks=[repair_task])
1990
1991 self._set_ids(host=host, queue_entries=[queue_entry])
1992 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001993
mblighd5c95802008-03-05 00:33:46 +00001994
jadmanski0afbb632008-06-06 21:10:57 +00001995 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001996 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001997 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001998 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001999
mblighd5c95802008-03-05 00:33:46 +00002000
showard21baa452008-10-21 00:08:39 +00002001 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00002002 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00002003 if self.success:
showardfa8629c2008-11-04 16:51:23 +00002004 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00002005 self.host.update_field('dirty', 0)
2006
2007
showardd3dc1992009-04-22 21:01:40 +00002008class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00002009 _num_running_parses = 0
2010
showardd3dc1992009-04-22 21:01:40 +00002011 def __init__(self, queue_entries, run_monitor=None):
2012 super(FinalReparseTask, self).__init__(queue_entries,
2013 pidfile_name=_PARSER_PID_FILE,
2014 logfile_name='.parse.log',
2015 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00002016 # don't use _set_ids, since we don't want to set the host_ids
2017 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard5add1c82009-05-26 19:27:46 +00002018 self._parse_started = (run_monitor is not None)
showard97aed502008-11-04 02:01:24 +00002019
showard97aed502008-11-04 02:01:24 +00002020
2021 @classmethod
2022 def _increment_running_parses(cls):
2023 cls._num_running_parses += 1
2024
2025
2026 @classmethod
2027 def _decrement_running_parses(cls):
2028 cls._num_running_parses -= 1
2029
2030
2031 @classmethod
2032 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00002033 return (cls._num_running_parses <
2034 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00002035
2036
2037 def prolog(self):
2038 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00002039 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00002040
2041
2042 def epilog(self):
2043 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00002044 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00002045
2046
showardd3dc1992009-04-22 21:01:40 +00002047 def _generate_command(self, results_dir):
mbligh9e936402009-05-13 20:42:17 +00002048 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o', '-P',
showardd3dc1992009-04-22 21:01:40 +00002049 results_dir]
showard97aed502008-11-04 02:01:24 +00002050
2051
showard08a36412009-05-05 01:01:13 +00002052 def tick(self):
2053 # override tick to keep trying to start until the parse count goes down
showard97aed502008-11-04 02:01:24 +00002054 # and we can, at which point we revert to default behavior
2055 if self._parse_started:
showard08a36412009-05-05 01:01:13 +00002056 super(FinalReparseTask, self).tick()
showard97aed502008-11-04 02:01:24 +00002057 else:
2058 self._try_starting_parse()
2059
2060
2061 def run(self):
2062 # override run() to not actually run unless we can
2063 self._try_starting_parse()
2064
2065
2066 def _try_starting_parse(self):
2067 if not self._can_run_new_parse():
2068 return
showard170873e2009-01-07 00:22:26 +00002069
showard97aed502008-11-04 02:01:24 +00002070 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00002071 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00002072
showard97aed502008-11-04 02:01:24 +00002073 self._increment_running_parses()
2074 self._parse_started = True
2075
2076
2077 def finished(self, success):
2078 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00002079 if self._parse_started:
2080 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00002081
2082
showardc9ae1782009-01-30 01:42:37 +00002083class SetEntryPendingTask(AgentTask):
2084 def __init__(self, queue_entry):
2085 super(SetEntryPendingTask, self).__init__(cmd='')
2086 self._queue_entry = queue_entry
2087 self._set_ids(queue_entries=[queue_entry])
2088
2089
2090 def run(self):
2091 agent = self._queue_entry.on_pending()
2092 if agent:
2093 self.agent.dispatcher.add_agent(agent)
2094 self.finished(True)
2095
2096
showarda3c58572009-03-12 20:36:59 +00002097class DBError(Exception):
2098 """Raised by the DBObject constructor when its select fails."""
2099
2100
mbligh36768f02008-02-22 18:28:33 +00002101class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00002102 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00002103
2104 # Subclasses MUST override these:
2105 _table_name = ''
2106 _fields = ()
2107
showarda3c58572009-03-12 20:36:59 +00002108 # A mapping from (type, id) to the instance of the object for that
2109 # particular id. This prevents us from creating new Job() and Host()
2110 # instances for every HostQueueEntry object that we instantiate as
2111 # multiple HQEs often share the same Job.
2112 _instances_by_type_and_id = weakref.WeakValueDictionary()
2113 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00002114
showarda3c58572009-03-12 20:36:59 +00002115
2116 def __new__(cls, id=None, **kwargs):
2117 """
2118 Look to see if we already have an instance for this particular type
2119 and id. If so, use it instead of creating a duplicate instance.
2120 """
2121 if id is not None:
2122 instance = cls._instances_by_type_and_id.get((cls, id))
2123 if instance:
2124 return instance
2125 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
2126
2127
2128 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00002129 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00002130 assert self._table_name, '_table_name must be defined in your class'
2131 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00002132 if not new_record:
2133 if self._initialized and not always_query:
2134 return # We've already been initialized.
2135 if id is None:
2136 id = row[0]
2137 # Tell future constructors to use us instead of re-querying while
2138 # this instance is still around.
2139 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00002140
showard6ae5ea92009-02-25 00:11:51 +00002141 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00002142
jadmanski0afbb632008-06-06 21:10:57 +00002143 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00002144
jadmanski0afbb632008-06-06 21:10:57 +00002145 if row is None:
showardccbd6c52009-03-21 00:10:21 +00002146 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00002147
showarda3c58572009-03-12 20:36:59 +00002148 if self._initialized:
2149 differences = self._compare_fields_in_row(row)
2150 if differences:
showard7629f142009-03-27 21:02:02 +00002151 logging.warn(
2152 'initialized %s %s instance requery is updating: %s',
2153 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00002154 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00002155 self._initialized = True
2156
2157
2158 @classmethod
2159 def _clear_instance_cache(cls):
2160 """Used for testing, clear the internal instance cache."""
2161 cls._instances_by_type_and_id.clear()
2162
2163
showardccbd6c52009-03-21 00:10:21 +00002164 def _fetch_row_from_db(self, row_id):
2165 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
2166 rows = _db.execute(sql, (row_id,))
2167 if not rows:
showard76e29d12009-04-15 21:53:10 +00002168 raise DBError("row not found (table=%s, row id=%s)"
2169 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00002170 return rows[0]
2171
2172
showarda3c58572009-03-12 20:36:59 +00002173 def _assert_row_length(self, row):
2174 assert len(row) == len(self._fields), (
2175 "table = %s, row = %s/%d, fields = %s/%d" % (
2176 self.__table, row, len(row), self._fields, len(self._fields)))
2177
2178
2179 def _compare_fields_in_row(self, row):
2180 """
2181 Given a row as returned by a SELECT query, compare it to our existing
2182 in memory fields.
2183
2184 @param row - A sequence of values corresponding to fields named in
2185 The class attribute _fields.
2186
2187 @returns A dictionary listing the differences keyed by field name
2188 containing tuples of (current_value, row_value).
2189 """
2190 self._assert_row_length(row)
2191 differences = {}
2192 for field, row_value in itertools.izip(self._fields, row):
2193 current_value = getattr(self, field)
2194 if current_value != row_value:
2195 differences[field] = (current_value, row_value)
2196 return differences
showard2bab8f42008-11-12 18:15:22 +00002197
2198
2199 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00002200 """
2201 Update our field attributes using a single row returned by SELECT.
2202
2203 @param row - A sequence of values corresponding to fields named in
2204 the class fields list.
2205 """
2206 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00002207
showard2bab8f42008-11-12 18:15:22 +00002208 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00002209 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00002210 setattr(self, field, value)
2211 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00002212
showard2bab8f42008-11-12 18:15:22 +00002213 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00002214
mblighe2586682008-02-29 22:45:46 +00002215
showardccbd6c52009-03-21 00:10:21 +00002216 def update_from_database(self):
2217 assert self.id is not None
2218 row = self._fetch_row_from_db(self.id)
2219 self._update_fields_from_row(row)
2220
2221
jadmanski0afbb632008-06-06 21:10:57 +00002222 def count(self, where, table = None):
2223 if not table:
2224 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00002225
jadmanski0afbb632008-06-06 21:10:57 +00002226 rows = _db.execute("""
2227 SELECT count(*) FROM %s
2228 WHERE %s
2229 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00002230
jadmanski0afbb632008-06-06 21:10:57 +00002231 assert len(rows) == 1
2232
2233 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00002234
2235
showardd3dc1992009-04-22 21:01:40 +00002236 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00002237 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00002238
showard2bab8f42008-11-12 18:15:22 +00002239 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00002240 return
mbligh36768f02008-02-22 18:28:33 +00002241
mblighf8c624d2008-07-03 16:58:45 +00002242 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00002243 _db.execute(query, (value, self.id))
2244
showard2bab8f42008-11-12 18:15:22 +00002245 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00002246
2247
jadmanski0afbb632008-06-06 21:10:57 +00002248 def save(self):
2249 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00002250 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00002251 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00002252 values = []
2253 for key in keys:
2254 value = getattr(self, key)
2255 if value is None:
2256 values.append('NULL')
2257 else:
2258 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00002259 values_str = ','.join(values)
2260 query = ('INSERT INTO %s (%s) VALUES (%s)' %
2261 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00002262 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00002263 # Update our id to the one the database just assigned to us.
2264 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00002265
2266
jadmanski0afbb632008-06-06 21:10:57 +00002267 def delete(self):
showarda3c58572009-03-12 20:36:59 +00002268 self._instances_by_type_and_id.pop((type(self), id), None)
2269 self._initialized = False
2270 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002271 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2272 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002273
2274
showard63a34772008-08-18 19:32:50 +00002275 @staticmethod
2276 def _prefix_with(string, prefix):
2277 if string:
2278 string = prefix + string
2279 return string
2280
2281
jadmanski0afbb632008-06-06 21:10:57 +00002282 @classmethod
showard989f25d2008-10-01 11:38:11 +00002283 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002284 """
2285 Construct instances of our class based on the given database query.
2286
2287 @yields One class instance for each row fetched.
2288 """
showard63a34772008-08-18 19:32:50 +00002289 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2290 where = cls._prefix_with(where, 'WHERE ')
2291 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002292 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002293 'joins' : joins,
2294 'where' : where,
2295 'order_by' : order_by})
2296 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002297 for row in rows:
2298 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002299
mbligh36768f02008-02-22 18:28:33 +00002300
2301class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002302 _table_name = 'ineligible_host_queues'
2303 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002304
2305
showard89f84db2009-03-12 20:39:13 +00002306class AtomicGroup(DBObject):
2307 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002308 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2309 'invalid')
showard89f84db2009-03-12 20:39:13 +00002310
2311
showard989f25d2008-10-01 11:38:11 +00002312class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002313 _table_name = 'labels'
2314 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002315 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002316
2317
mbligh36768f02008-02-22 18:28:33 +00002318class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002319 _table_name = 'hosts'
2320 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2321 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2322
2323
jadmanski0afbb632008-06-06 21:10:57 +00002324 def current_task(self):
2325 rows = _db.execute("""
2326 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2327 """, (self.id,))
2328
2329 if len(rows) == 0:
2330 return None
2331 else:
2332 assert len(rows) == 1
2333 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002334 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002335
2336
jadmanski0afbb632008-06-06 21:10:57 +00002337 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002338 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002339 if self.current_task():
2340 self.current_task().requeue()
2341
showard6ae5ea92009-02-25 00:11:51 +00002342
jadmanski0afbb632008-06-06 21:10:57 +00002343 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002344 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002345 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002346
2347
showard170873e2009-01-07 00:22:26 +00002348 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002349 """
showard170873e2009-01-07 00:22:26 +00002350 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002351 """
2352 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002353 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002354 FROM labels
2355 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002356 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002357 ORDER BY labels.name
2358 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002359 platform = None
2360 all_labels = []
2361 for label_name, is_platform in rows:
2362 if is_platform:
2363 platform = label_name
2364 all_labels.append(label_name)
2365 return platform, all_labels
2366
2367
showarda64e52a2009-06-08 23:24:08 +00002368 def reverify_tasks(self, cleanup=True):
2369 tasks = [VerifyTask(host=self)]
showard6d7b2ff2009-06-10 00:16:47 +00002370 # just to make sure this host does not get taken away
2371 self.set_status('Verifying')
showarda64e52a2009-06-08 23:24:08 +00002372 if cleanup:
2373 tasks.insert(0, CleanupTask(host=self))
showard6d7b2ff2009-06-10 00:16:47 +00002374 self.set_status('Cleaning')
showarda64e52a2009-06-08 23:24:08 +00002375 return tasks
showardd8e548a2008-09-09 03:04:57 +00002376
2377
showard54c1ea92009-05-20 00:32:58 +00002378 _ALPHANUM_HOST_RE = re.compile(r'^([a-z-]+)(\d+)$', re.IGNORECASE)
2379
2380
2381 @classmethod
2382 def cmp_for_sort(cls, a, b):
2383 """
2384 A comparison function for sorting Host objects by hostname.
2385
2386 This strips any trailing numeric digits, ignores leading 0s and
2387 compares hostnames by the leading name and the trailing digits as a
2388 number. If both hostnames do not match this pattern, they are simply
2389 compared as lower case strings.
2390
2391 Example of how hostnames will be sorted:
2392
2393 alice, host1, host2, host09, host010, host10, host11, yolkfolk
2394
2395 This hopefully satisfy most people's hostname sorting needs regardless
2396 of their exact naming schemes. Nobody sane should have both a host10
2397 and host010 (but the algorithm works regardless).
2398 """
2399 lower_a = a.hostname.lower()
2400 lower_b = b.hostname.lower()
2401 match_a = cls._ALPHANUM_HOST_RE.match(lower_a)
2402 match_b = cls._ALPHANUM_HOST_RE.match(lower_b)
2403 if match_a and match_b:
2404 name_a, number_a_str = match_a.groups()
2405 name_b, number_b_str = match_b.groups()
2406 number_a = int(number_a_str.lstrip('0'))
2407 number_b = int(number_b_str.lstrip('0'))
2408 result = cmp((name_a, number_a), (name_b, number_b))
2409 if result == 0 and lower_a != lower_b:
2410 # If they compared equal above but the lower case names are
2411 # indeed different, don't report equality. abc012 != abc12.
2412 return cmp(lower_a, lower_b)
2413 return result
2414 else:
2415 return cmp(lower_a, lower_b)
2416
2417
mbligh36768f02008-02-22 18:28:33 +00002418class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002419 _table_name = 'host_queue_entries'
2420 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002421 'active', 'complete', 'deleted', 'execution_subdir',
showard12f3e322009-05-13 21:27:42 +00002422 'atomic_group_id', 'aborted', 'started_on')
showard6ae5ea92009-02-25 00:11:51 +00002423
2424
showarda3c58572009-03-12 20:36:59 +00002425 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002426 assert id or row
showarda3c58572009-03-12 20:36:59 +00002427 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002428 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002429
jadmanski0afbb632008-06-06 21:10:57 +00002430 if self.host_id:
2431 self.host = Host(self.host_id)
2432 else:
2433 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002434
showard77182562009-06-10 00:16:05 +00002435 if self.atomic_group_id:
2436 self.atomic_group = AtomicGroup(self.atomic_group_id,
2437 always_query=False)
2438 else:
2439 self.atomic_group = None
2440
showard170873e2009-01-07 00:22:26 +00002441 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002442 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002443
2444
showard89f84db2009-03-12 20:39:13 +00002445 @classmethod
2446 def clone(cls, template):
2447 """
2448 Creates a new row using the values from a template instance.
2449
2450 The new instance will not exist in the database or have a valid
2451 id attribute until its save() method is called.
2452 """
2453 assert isinstance(template, cls)
2454 new_row = [getattr(template, field) for field in cls._fields]
2455 clone = cls(row=new_row, new_record=True)
2456 clone.id = None
2457 return clone
2458
2459
showardc85c21b2008-11-24 22:17:37 +00002460 def _view_job_url(self):
2461 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2462
2463
showardf1ae3542009-05-11 19:26:02 +00002464 def get_labels(self):
2465 """
2466 Get all labels associated with this host queue entry (either via the
2467 meta_host or as a job dependency label). The labels yielded are not
2468 guaranteed to be unique.
2469
2470 @yields Label instances associated with this host_queue_entry.
2471 """
2472 if self.meta_host:
2473 yield Label(id=self.meta_host, always_query=False)
2474 labels = Label.fetch(
2475 joins="JOIN jobs_dependency_labels AS deps "
2476 "ON (labels.id = deps.label_id)",
2477 where="deps.job_id = %d" % self.job.id)
2478 for label in labels:
2479 yield label
2480
2481
jadmanski0afbb632008-06-06 21:10:57 +00002482 def set_host(self, host):
2483 if host:
2484 self.queue_log_record('Assigning host ' + host.hostname)
2485 self.update_field('host_id', host.id)
2486 self.update_field('active', True)
2487 self.block_host(host.id)
2488 else:
2489 self.queue_log_record('Releasing host')
2490 self.unblock_host(self.host.id)
2491 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002492
jadmanski0afbb632008-06-06 21:10:57 +00002493 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002494
2495
jadmanski0afbb632008-06-06 21:10:57 +00002496 def get_host(self):
2497 return self.host
mbligh36768f02008-02-22 18:28:33 +00002498
2499
jadmanski0afbb632008-06-06 21:10:57 +00002500 def queue_log_record(self, log_line):
2501 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002502 _drone_manager.write_lines_to_file(self.queue_log_path,
2503 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002504
2505
jadmanski0afbb632008-06-06 21:10:57 +00002506 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002507 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002508 row = [0, self.job.id, host_id]
2509 block = IneligibleHostQueue(row=row, new_record=True)
2510 block.save()
mblighe2586682008-02-29 22:45:46 +00002511
2512
jadmanski0afbb632008-06-06 21:10:57 +00002513 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002514 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002515 blocks = IneligibleHostQueue.fetch(
2516 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2517 for block in blocks:
2518 block.delete()
mblighe2586682008-02-29 22:45:46 +00002519
2520
showard2bab8f42008-11-12 18:15:22 +00002521 def set_execution_subdir(self, subdir=None):
2522 if subdir is None:
2523 assert self.get_host()
2524 subdir = self.get_host().hostname
2525 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002526
2527
showard6355f6b2008-12-05 18:52:13 +00002528 def _get_hostname(self):
2529 if self.host:
2530 return self.host.hostname
2531 return 'no host'
2532
2533
showard170873e2009-01-07 00:22:26 +00002534 def __str__(self):
2535 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2536
2537
jadmanski0afbb632008-06-06 21:10:57 +00002538 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002539 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002540
showardb18134f2009-03-20 20:52:18 +00002541 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002542
showardc85c21b2008-11-24 22:17:37 +00002543 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002544 self.update_field('complete', False)
2545 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002546
jadmanski0afbb632008-06-06 21:10:57 +00002547 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002548 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002549 self.update_field('complete', False)
2550 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002551
showardc85c21b2008-11-24 22:17:37 +00002552 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002553 self.update_field('complete', True)
2554 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002555
2556 should_email_status = (status.lower() in _notify_email_statuses or
2557 'all' in _notify_email_statuses)
2558 if should_email_status:
2559 self._email_on_status(status)
2560
2561 self._email_on_job_complete()
2562
2563
2564 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002565 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002566
2567 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2568 self.job.id, self.job.name, hostname, status)
2569 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2570 self.job.id, self.job.name, hostname, status,
2571 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002572 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002573
2574
2575 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002576 if not self.job.is_finished():
2577 return
showard542e8402008-09-19 20:16:18 +00002578
showardc85c21b2008-11-24 22:17:37 +00002579 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002580 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002581 for queue_entry in hosts_queue:
2582 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002583 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002584 queue_entry.status))
2585
2586 summary_text = "\n".join(summary_text)
2587 status_counts = models.Job.objects.get_status_counts(
2588 [self.job.id])[self.job.id]
2589 status = ', '.join('%d %s' % (count, status) for status, count
2590 in status_counts.iteritems())
2591
2592 subject = 'Autotest: Job ID: %s "%s" %s' % (
2593 self.job.id, self.job.name, status)
2594 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2595 self.job.id, self.job.name, status, self._view_job_url(),
2596 summary_text)
showard170873e2009-01-07 00:22:26 +00002597 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002598
2599
showard77182562009-06-10 00:16:05 +00002600 def run_pre_job_tasks(self, assigned_host=None):
2601 if self.meta_host is not None or self.atomic_group:
jadmanski0afbb632008-06-06 21:10:57 +00002602 assert assigned_host
2603 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002604 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002605
showardb18134f2009-03-20 20:52:18 +00002606 logging.info("%s/%s/%s scheduled on %s, status=%s",
2607 self.job.name, self.meta_host, self.atomic_group_id,
2608 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002609
showard77182562009-06-10 00:16:05 +00002610 return self._do_run_pre_job_tasks()
2611
2612
2613 def _do_run_pre_job_tasks(self):
2614 # Every host goes thru the Verifying stage (which may or may not
2615 # actually do anything as determined by get_pre_job_tasks).
2616 self.set_status(models.HostQueueEntry.Status.VERIFYING)
2617
2618 # The pre job tasks always end with a SetEntryPendingTask which
2619 # will continue as appropriate through queue_entry.on_pending().
2620 return Agent(self.job.get_pre_job_tasks(queue_entry=self))
mblighe2586682008-02-29 22:45:46 +00002621
showard6ae5ea92009-02-25 00:11:51 +00002622
jadmanski0afbb632008-06-06 21:10:57 +00002623 def requeue(self):
2624 self.set_status('Queued')
showard12f3e322009-05-13 21:27:42 +00002625 self.update_field('started_on', None)
showardde634ee2009-01-30 01:44:24 +00002626 # verify/cleanup failure sets the execution subdir, so reset it here
2627 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002628 if self.meta_host:
2629 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002630
2631
jadmanski0afbb632008-06-06 21:10:57 +00002632 def handle_host_failure(self):
2633 """\
2634 Called when this queue entry's host has failed verification and
2635 repair.
2636 """
2637 assert not self.meta_host
2638 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002639 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002640
2641
jadmanskif7fa2cc2008-10-01 14:13:23 +00002642 @property
2643 def aborted_by(self):
2644 self._load_abort_info()
2645 return self._aborted_by
2646
2647
2648 @property
2649 def aborted_on(self):
2650 self._load_abort_info()
2651 return self._aborted_on
2652
2653
2654 def _load_abort_info(self):
2655 """ Fetch info about who aborted the job. """
2656 if hasattr(self, "_aborted_by"):
2657 return
2658 rows = _db.execute("""
2659 SELECT users.login, aborted_host_queue_entries.aborted_on
2660 FROM aborted_host_queue_entries
2661 INNER JOIN users
2662 ON users.id = aborted_host_queue_entries.aborted_by_id
2663 WHERE aborted_host_queue_entries.queue_entry_id = %s
2664 """, (self.id,))
2665 if rows:
2666 self._aborted_by, self._aborted_on = rows[0]
2667 else:
2668 self._aborted_by = self._aborted_on = None
2669
2670
showardb2e2c322008-10-14 17:33:55 +00002671 def on_pending(self):
2672 """
2673 Called when an entry in a synchronous job has passed verify. If the
2674 job is ready to run, returns an agent to run the job. Returns None
2675 otherwise.
2676 """
2677 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002678 self.get_host().set_status('Pending')
showard77182562009-06-10 00:16:05 +00002679 return self.job.run_if_ready(queue_entry=self)
showardb2e2c322008-10-14 17:33:55 +00002680
2681
showardd3dc1992009-04-22 21:01:40 +00002682 def abort(self, dispatcher):
2683 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002684
showardd3dc1992009-04-22 21:01:40 +00002685 Status = models.HostQueueEntry.Status
2686 has_running_job_agent = (
2687 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2688 and dispatcher.get_agents_for_entry(self))
2689 if has_running_job_agent:
2690 # do nothing; post-job tasks will finish and then mark this entry
2691 # with status "Aborted" and take care of the host
2692 return
2693
2694 if self.status in (Status.STARTING, Status.PENDING):
2695 self.host.set_status(models.Host.Status.READY)
2696 elif self.status == Status.VERIFYING:
2697 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2698
2699 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002700
2701 def execution_tag(self):
2702 assert self.execution_subdir
2703 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002704
2705
mbligh36768f02008-02-22 18:28:33 +00002706class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002707 _table_name = 'jobs'
2708 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2709 'control_type', 'created_on', 'synch_count', 'timeout',
showarda1e74b32009-05-12 17:32:04 +00002710 'run_verify', 'email_list', 'reboot_before', 'reboot_after',
showard12f3e322009-05-13 21:27:42 +00002711 'parse_failed_repair', 'max_runtime_hrs')
showard6ae5ea92009-02-25 00:11:51 +00002712
showard77182562009-06-10 00:16:05 +00002713 # This does not need to be a column in the DB. The delays are likely to
2714 # be configured short. If the scheduler is stopped and restarted in
2715 # the middle of a job's delay cycle, the delay cycle will either be
2716 # repeated or skipped depending on the number of Pending machines found
2717 # when the restarted scheduler recovers to track it. Not a problem.
2718 #
2719 # A reference to the DelayedCallTask that will wake up the job should
2720 # no other HQEs change state in time. Its end_time attribute is used
2721 # by our run_with_ready_delay() method to determine if the wait is over.
2722 _delay_ready_task = None
2723
2724 # TODO(gps): On scheduler start/recovery we need to call HQE.on_pending() on
2725 # all status='Pending' atomic group HQEs incase a delay was running when the
2726 # scheduler was restarted and no more hosts ever successfully exit Verify.
showard6ae5ea92009-02-25 00:11:51 +00002727
showarda3c58572009-03-12 20:36:59 +00002728 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002729 assert id or row
showarda3c58572009-03-12 20:36:59 +00002730 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002731
mblighe2586682008-02-29 22:45:46 +00002732
jadmanski0afbb632008-06-06 21:10:57 +00002733 def is_server_job(self):
2734 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002735
2736
showard170873e2009-01-07 00:22:26 +00002737 def tag(self):
2738 return "%s-%s" % (self.id, self.owner)
2739
2740
jadmanski0afbb632008-06-06 21:10:57 +00002741 def get_host_queue_entries(self):
2742 rows = _db.execute("""
2743 SELECT * FROM host_queue_entries
2744 WHERE job_id= %s
2745 """, (self.id,))
2746 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002747
jadmanski0afbb632008-06-06 21:10:57 +00002748 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002749
jadmanski0afbb632008-06-06 21:10:57 +00002750 return entries
mbligh36768f02008-02-22 18:28:33 +00002751
2752
jadmanski0afbb632008-06-06 21:10:57 +00002753 def set_status(self, status, update_queues=False):
2754 self.update_field('status',status)
2755
2756 if update_queues:
2757 for queue_entry in self.get_host_queue_entries():
2758 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002759
2760
showard77182562009-06-10 00:16:05 +00002761 def _atomic_and_has_started(self):
2762 """
2763 @returns True if any of the HostQueueEntries associated with this job
2764 have entered the Status.STARTING state or beyond.
2765 """
2766 atomic_entries = models.HostQueueEntry.objects.filter(
2767 job=self.id, atomic_group__isnull=False)
2768 if atomic_entries.count() <= 0:
2769 return False
2770
2771 non_started_statuses = (models.HostQueueEntry.Status.QUEUED,
2772 models.HostQueueEntry.Status.VERIFYING,
2773 models.HostQueueEntry.Status.PENDING)
2774 started_entries = atomic_entries.exclude(
2775 status__in=non_started_statuses)
2776 return started_entries.count() > 0
2777
2778
2779 def _pending_count(self):
2780 """The number of HostQueueEntries for this job in the Pending state."""
2781 pending_entries = models.HostQueueEntry.objects.filter(
2782 job=self.id, status=models.HostQueueEntry.Status.PENDING)
2783 return pending_entries.count()
2784
2785
jadmanski0afbb632008-06-06 21:10:57 +00002786 def is_ready(self):
showard77182562009-06-10 00:16:05 +00002787 # NOTE: Atomic group jobs stop reporting ready after they have been
2788 # started to avoid launching multiple copies of one atomic job.
2789 # Only possible if synch_count is less than than half the number of
2790 # machines in the atomic group.
2791 return (self._pending_count() >= self.synch_count
2792 and not self._atomic_and_has_started())
mbligh36768f02008-02-22 18:28:33 +00002793
2794
jadmanski0afbb632008-06-06 21:10:57 +00002795 def num_machines(self, clause = None):
2796 sql = "job_id=%s" % self.id
2797 if clause:
2798 sql += " AND (%s)" % clause
2799 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002800
2801
jadmanski0afbb632008-06-06 21:10:57 +00002802 def num_queued(self):
2803 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002804
2805
jadmanski0afbb632008-06-06 21:10:57 +00002806 def num_active(self):
2807 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002808
2809
jadmanski0afbb632008-06-06 21:10:57 +00002810 def num_complete(self):
2811 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002812
2813
jadmanski0afbb632008-06-06 21:10:57 +00002814 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002815 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002816
mbligh36768f02008-02-22 18:28:33 +00002817
showard6bb7c292009-01-30 01:44:51 +00002818 def _not_yet_run_entries(self, include_verifying=True):
2819 statuses = [models.HostQueueEntry.Status.QUEUED,
2820 models.HostQueueEntry.Status.PENDING]
2821 if include_verifying:
2822 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2823 return models.HostQueueEntry.objects.filter(job=self.id,
2824 status__in=statuses)
2825
2826
2827 def _stop_all_entries(self):
2828 entries_to_stop = self._not_yet_run_entries(
2829 include_verifying=False)
2830 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002831 assert not child_entry.complete, (
2832 '%s status=%s, active=%s, complete=%s' %
2833 (child_entry.id, child_entry.status, child_entry.active,
2834 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002835 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2836 child_entry.host.status = models.Host.Status.READY
2837 child_entry.host.save()
2838 child_entry.status = models.HostQueueEntry.Status.STOPPED
2839 child_entry.save()
2840
showard2bab8f42008-11-12 18:15:22 +00002841 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002842 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002843 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002844 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002845
2846
jadmanski0afbb632008-06-06 21:10:57 +00002847 def write_to_machines_file(self, queue_entry):
2848 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002849 file_path = os.path.join(self.tag(), '.machines')
2850 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002851
2852
showardf1ae3542009-05-11 19:26:02 +00002853 def _next_group_name(self, group_name=''):
2854 """@returns a directory name to use for the next host group results."""
2855 if group_name:
2856 # Sanitize for use as a pathname.
2857 group_name = group_name.replace(os.path.sep, '_')
2858 if group_name.startswith('.'):
2859 group_name = '_' + group_name[1:]
2860 # Add a separator between the group name and 'group%d'.
2861 group_name += '.'
2862 group_count_re = re.compile(r'%sgroup(\d+)' % re.escape(group_name))
showard2bab8f42008-11-12 18:15:22 +00002863 query = models.HostQueueEntry.objects.filter(
2864 job=self.id).values('execution_subdir').distinct()
2865 subdirs = (entry['execution_subdir'] for entry in query)
showardf1ae3542009-05-11 19:26:02 +00002866 group_matches = (group_count_re.match(subdir) for subdir in subdirs)
2867 ids = [int(match.group(1)) for match in group_matches if match]
showard2bab8f42008-11-12 18:15:22 +00002868 if ids:
2869 next_id = max(ids) + 1
2870 else:
2871 next_id = 0
showardf1ae3542009-05-11 19:26:02 +00002872 return '%sgroup%d' % (group_name, next_id)
showard2bab8f42008-11-12 18:15:22 +00002873
2874
showard170873e2009-01-07 00:22:26 +00002875 def _write_control_file(self, execution_tag):
2876 control_path = _drone_manager.attach_file_to_execution(
2877 execution_tag, self.control_file)
2878 return control_path
mbligh36768f02008-02-22 18:28:33 +00002879
showardb2e2c322008-10-14 17:33:55 +00002880
showard2bab8f42008-11-12 18:15:22 +00002881 def get_group_entries(self, queue_entry_from_group):
2882 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002883 return list(HostQueueEntry.fetch(
2884 where='job_id=%s AND execution_subdir=%s',
2885 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002886
2887
showardb2e2c322008-10-14 17:33:55 +00002888 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002889 assert queue_entries
2890 execution_tag = queue_entries[0].execution_tag()
2891 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002892 hostnames = ','.join([entry.get_host().hostname
2893 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002894
showard87ba02a2009-04-20 19:37:32 +00002895 params = _autoserv_command_line(
2896 hostnames, execution_tag,
2897 ['-P', execution_tag, '-n',
2898 _drone_manager.absolute_path(control_path)],
2899 job=self)
mbligh36768f02008-02-22 18:28:33 +00002900
jadmanski0afbb632008-06-06 21:10:57 +00002901 if not self.is_server_job():
2902 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002903
showardb2e2c322008-10-14 17:33:55 +00002904 return params
mblighe2586682008-02-29 22:45:46 +00002905
mbligh36768f02008-02-22 18:28:33 +00002906
showardc9ae1782009-01-30 01:42:37 +00002907 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002908 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002909 return True
showard0fc38302008-10-23 00:44:07 +00002910 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002911 return queue_entry.get_host().dirty
2912 return False
showard21baa452008-10-21 00:08:39 +00002913
showardc9ae1782009-01-30 01:42:37 +00002914
2915 def _should_run_verify(self, queue_entry):
2916 do_not_verify = (queue_entry.host.protection ==
2917 host_protections.Protection.DO_NOT_VERIFY)
2918 if do_not_verify:
2919 return False
2920 return self.run_verify
2921
2922
showard77182562009-06-10 00:16:05 +00002923 def get_pre_job_tasks(self, queue_entry):
2924 """
2925 Get a list of tasks to perform before the host_queue_entry
2926 may be used to run this Job (such as Cleanup & Verify).
2927
2928 @returns A list of tasks to be done to the given queue_entry before
2929 it should be considered be ready to run this job. The last
2930 task in the list calls HostQueueEntry.on_pending(), which
2931 continues the flow of the job.
2932 """
showard21baa452008-10-21 00:08:39 +00002933 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002934 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002935 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002936 if self._should_run_verify(queue_entry):
2937 tasks.append(VerifyTask(queue_entry=queue_entry))
2938 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002939 return tasks
2940
2941
showardf1ae3542009-05-11 19:26:02 +00002942 def _assign_new_group(self, queue_entries, group_name=''):
showard2bab8f42008-11-12 18:15:22 +00002943 if len(queue_entries) == 1:
showardf1ae3542009-05-11 19:26:02 +00002944 group_subdir_name = queue_entries[0].get_host().hostname
showard2bab8f42008-11-12 18:15:22 +00002945 else:
showardf1ae3542009-05-11 19:26:02 +00002946 group_subdir_name = self._next_group_name(group_name)
showardb18134f2009-03-20 20:52:18 +00002947 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002948 self.id, [entry.host.hostname for entry in queue_entries],
showardf1ae3542009-05-11 19:26:02 +00002949 group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002950
2951 for queue_entry in queue_entries:
showardf1ae3542009-05-11 19:26:02 +00002952 queue_entry.set_execution_subdir(group_subdir_name)
showard2bab8f42008-11-12 18:15:22 +00002953
2954
2955 def _choose_group_to_run(self, include_queue_entry):
showardf1ae3542009-05-11 19:26:02 +00002956 """
2957 @returns A tuple containing a list of HostQueueEntry instances to be
2958 used to run this Job, a string group name to suggest giving
showard54c1ea92009-05-20 00:32:58 +00002959 to this job in the results database.
showardf1ae3542009-05-11 19:26:02 +00002960 """
showard77182562009-06-10 00:16:05 +00002961 atomic_group = include_queue_entry.atomic_group
showard2bab8f42008-11-12 18:15:22 +00002962 chosen_entries = [include_queue_entry]
showardf1ae3542009-05-11 19:26:02 +00002963 if atomic_group:
2964 num_entries_wanted = atomic_group.max_number_of_machines
2965 else:
2966 num_entries_wanted = self.synch_count
2967 num_entries_wanted -= len(chosen_entries)
showard2bab8f42008-11-12 18:15:22 +00002968
showardf1ae3542009-05-11 19:26:02 +00002969 if num_entries_wanted > 0:
2970 where_clause = 'job_id = %s AND status = "Pending" AND id != %s'
showard54c1ea92009-05-20 00:32:58 +00002971 pending_entries = list(HostQueueEntry.fetch(
showardf1ae3542009-05-11 19:26:02 +00002972 where=where_clause,
showard54c1ea92009-05-20 00:32:58 +00002973 params=(self.id, include_queue_entry.id)))
2974
2975 # Sort the chosen hosts by hostname before slicing.
2976 def cmp_queue_entries_by_hostname(entry_a, entry_b):
2977 return Host.cmp_for_sort(entry_a.host, entry_b.host)
2978 pending_entries.sort(cmp=cmp_queue_entries_by_hostname)
2979 chosen_entries += pending_entries[:num_entries_wanted]
showard2bab8f42008-11-12 18:15:22 +00002980
showardf1ae3542009-05-11 19:26:02 +00002981 # Sanity check. We'll only ever be called if this can be met.
2982 assert len(chosen_entries) >= self.synch_count
2983
2984 if atomic_group:
2985 # Look at any meta_host and dependency labels and pick the first
2986 # one that also specifies this atomic group. Use that label name
2987 # as the group name if possible (it is more specific).
2988 group_name = atomic_group.name
2989 for label in include_queue_entry.get_labels():
2990 if label.atomic_group_id:
2991 assert label.atomic_group_id == atomic_group.id
2992 group_name = label.name
2993 break
2994 else:
2995 group_name = ''
2996
2997 self._assign_new_group(chosen_entries, group_name=group_name)
2998 return chosen_entries, group_name
showard2bab8f42008-11-12 18:15:22 +00002999
3000
showard77182562009-06-10 00:16:05 +00003001 def run_if_ready(self, queue_entry):
3002 """
3003 @returns An Agent instance to ultimately run this job if enough hosts
3004 are ready for it to run.
3005 @returns None and potentially cleans up excess hosts if this Job
3006 is not ready to run.
3007 """
showardb2e2c322008-10-14 17:33:55 +00003008 if not self.is_ready():
showard77182562009-06-10 00:16:05 +00003009 self.stop_if_necessary()
3010 return None
mbligh36768f02008-02-22 18:28:33 +00003011
showard77182562009-06-10 00:16:05 +00003012 if queue_entry.atomic_group:
3013 return self.run_with_ready_delay(queue_entry)
3014
3015 return self.run(queue_entry)
3016
3017
3018 def run_with_ready_delay(self, queue_entry):
3019 """
3020 Start a delay to wait for more hosts to enter Pending state before
3021 launching an atomic group job. Once set, the a delay cannot be reset.
3022
3023 @param queue_entry: The HostQueueEntry object to get atomic group
3024 info from and pass to run_if_ready when the delay is up.
3025
3026 @returns An Agent to run the job as appropriate or None if a delay
3027 has already been set.
3028 """
3029 assert queue_entry.job_id == self.id
3030 assert queue_entry.atomic_group
3031 delay = scheduler_config.config.secs_to_wait_for_atomic_group_hosts
3032 pending_threshold = queue_entry.atomic_group.max_number_of_machines
3033 over_max_threshold = (self._pending_count() >= pending_threshold)
3034 delay_expired = (self._delay_ready_task and
3035 time.time() >= self._delay_ready_task.end_time)
3036
3037 # Delay is disabled or we already have enough? Do not wait to run.
3038 if not delay or over_max_threshold or delay_expired:
3039 return self.run(queue_entry)
3040
3041 # A delay was previously scheduled.
3042 if self._delay_ready_task:
3043 return None
3044
3045 def run_job_after_delay():
3046 logging.info('Job %s done waiting for extra hosts.', self.id)
3047 return self.run(queue_entry)
3048
3049 self._delay_ready_task = DelayedCallTask(delay_seconds=delay,
3050 callback=run_job_after_delay)
3051
3052 return Agent([self._delay_ready_task], num_processes=0)
3053
3054
3055 def run(self, queue_entry):
3056 """
3057 @param queue_entry: The HostQueueEntry instance calling this method.
3058 @returns An Agent instance to run this job or None if we've already
3059 been run.
3060 """
3061 if queue_entry.atomic_group and self._atomic_and_has_started():
3062 logging.error('Job.run() called on running atomic Job %d '
3063 'with HQE %s.', self.id, queue_entry)
3064 return None
showardf1ae3542009-05-11 19:26:02 +00003065 queue_entries, group_name = self._choose_group_to_run(queue_entry)
3066 return self._finish_run(queue_entries, group_name)
showardb2e2c322008-10-14 17:33:55 +00003067
3068
showardf1ae3542009-05-11 19:26:02 +00003069 def _finish_run(self, queue_entries, group_name):
showardb2ccdda2008-10-28 20:39:05 +00003070 for queue_entry in queue_entries:
3071 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00003072 params = self._get_autoserv_params(queue_entries)
3073 queue_task = QueueTask(job=self, queue_entries=queue_entries,
showardf1ae3542009-05-11 19:26:02 +00003074 cmd=params, group_name=group_name)
3075 tasks = [queue_task]
showard77182562009-06-10 00:16:05 +00003076 if self._delay_ready_task:
3077 # Cancel any pending callback that would try to run again
3078 # as we are already running.
3079 self._delay_ready_task.abort()
showardb2e2c322008-10-14 17:33:55 +00003080
showard170873e2009-01-07 00:22:26 +00003081 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00003082
3083
mbligh36768f02008-02-22 18:28:33 +00003084if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00003085 main()