blob: b52e9f84e50a86e57e3ee25975d371083a89719b [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
mblighf3294cc2009-04-08 21:17:38 +000019from autotest_lib.scheduler import monitor_db_cleanup
showardd1ee1dd2009-01-07 21:33:08 +000020from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000021
mblighb090f142008-02-27 21:33:46 +000022
mbligh36768f02008-02-22 18:28:33 +000023RESULTS_DIR = '.'
24AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000025DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000026
27AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
28
29if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000030 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000031AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
32AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
33
34if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000035 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000036
mbligh90a549d2008-03-25 23:52:34 +000037# how long to wait for autoserv to write a pidfile
38PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000039
showardd3dc1992009-04-22 21:01:40 +000040_AUTOSERV_PID_FILE = '.autoserv_execute'
41_CRASHINFO_PID_FILE = '.collect_crashinfo_execute'
42_PARSER_PID_FILE = '.parser_execute'
43
44_ALL_PIDFILE_NAMES = (_AUTOSERV_PID_FILE, _CRASHINFO_PID_FILE,
45 _PARSER_PID_FILE)
46
showard35162b02009-03-03 02:17:30 +000047# error message to leave in results dir when an autoserv process disappears
48# mysteriously
49_LOST_PROCESS_ERROR = """\
50Autoserv failed abnormally during execution for this job, probably due to a
51system error on the Autotest server. Full results may not be available. Sorry.
52"""
53
mbligh6f8bab42008-02-29 22:45:14 +000054_db = None
mbligh36768f02008-02-22 18:28:33 +000055_shutdown = False
showard170873e2009-01-07 00:22:26 +000056_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
57_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000058_testing_mode = False
showard542e8402008-09-19 20:16:18 +000059_base_url = None
showardc85c21b2008-11-24 22:17:37 +000060_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000061_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000062
showardb18134f2009-03-20 20:52:18 +000063# load the logging settings
64scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
showard50e463b2009-04-07 18:13:45 +000065if not os.environ.has_key('AUTOTEST_SCHEDULER_LOG_DIR'):
66 os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
showardb18134f2009-03-20 20:52:18 +000067# Here we export the log name, using the same convention as autoserv's results
68# directory.
mblighc9895aa2009-04-01 18:36:58 +000069if os.environ.has_key('AUTOTEST_SCHEDULER_LOG_NAME'):
70 scheduler_log_name = os.environ['AUTOTEST_SCHEDULER_LOG_NAME']
71else:
72 scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
73 os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
74
showardb18134f2009-03-20 20:52:18 +000075logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
76
mbligh36768f02008-02-22 18:28:33 +000077
78def main():
showard27f33872009-04-07 18:20:53 +000079 try:
80 main_without_exception_handling()
81 except:
82 logging.exception('Exception escaping in monitor_db')
83 raise
84
85
86def main_without_exception_handling():
jadmanski0afbb632008-06-06 21:10:57 +000087 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000088
jadmanski0afbb632008-06-06 21:10:57 +000089 parser = optparse.OptionParser(usage)
90 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
91 action='store_true')
92 parser.add_option('--logfile', help='Set a log file that all stdout ' +
93 'should be redirected to. Stderr will go to this ' +
94 'file + ".err"')
95 parser.add_option('--test', help='Indicate that scheduler is under ' +
96 'test and should use dummy autoserv and no parsing',
97 action='store_true')
98 (options, args) = parser.parse_args()
99 if len(args) != 1:
100 parser.print_usage()
101 return
mbligh36768f02008-02-22 18:28:33 +0000102
jadmanski0afbb632008-06-06 21:10:57 +0000103 global RESULTS_DIR
104 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +0000105
showardcca334f2009-03-12 20:38:34 +0000106 # Change the cwd while running to avoid issues incase we were launched from
107 # somewhere odd (such as a random NFS home directory of the person running
108 # sudo to launch us as the appropriate user).
109 os.chdir(RESULTS_DIR)
110
jadmanski0afbb632008-06-06 21:10:57 +0000111 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +0000112 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
113 "notify_email_statuses",
114 default='')
showardc85c21b2008-11-24 22:17:37 +0000115 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +0000116 _notify_email_statuses = [status for status in
117 re.split(r'[\s,;:]', notify_statuses_list.lower())
118 if status]
showardc85c21b2008-11-24 22:17:37 +0000119
jadmanski0afbb632008-06-06 21:10:57 +0000120 if options.test:
121 global _autoserv_path
122 _autoserv_path = 'autoserv_dummy'
123 global _testing_mode
124 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000125
mbligh37eceaa2008-12-15 22:56:37 +0000126 # AUTOTEST_WEB.base_url is still a supported config option as some people
127 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000128 global _base_url
showard170873e2009-01-07 00:22:26 +0000129 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
130 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000131 if config_base_url:
132 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000133 else:
mbligh37eceaa2008-12-15 22:56:37 +0000134 # For the common case of everything running on a single server you
135 # can just set the hostname in a single place in the config file.
136 server_name = c.get_config_value('SERVER', 'hostname')
137 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000138 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000139 sys.exit(1)
140 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000141
showardc5afc462009-01-13 00:09:39 +0000142 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000143 server.start()
144
jadmanski0afbb632008-06-06 21:10:57 +0000145 try:
showardc5afc462009-01-13 00:09:39 +0000146 init(options.logfile)
147 dispatcher = Dispatcher()
showard915958d2009-04-22 21:00:58 +0000148 dispatcher.initialize(recover_hosts=options.recover_hosts)
showardc5afc462009-01-13 00:09:39 +0000149
jadmanski0afbb632008-06-06 21:10:57 +0000150 while not _shutdown:
151 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000152 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000153 except:
showard170873e2009-01-07 00:22:26 +0000154 email_manager.manager.log_stacktrace(
155 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000156
showard170873e2009-01-07 00:22:26 +0000157 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000158 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000159 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000160 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000161
162
163def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000164 global _shutdown
165 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000167
168
169def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000170 if logfile:
171 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000172 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
173 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000174
mblighfb676032009-04-01 18:25:38 +0000175 utils.write_pid("monitor_db")
176
showardb1e51872008-10-07 11:08:18 +0000177 if _testing_mode:
178 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000179 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000180
jadmanski0afbb632008-06-06 21:10:57 +0000181 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
182 global _db
showard170873e2009-01-07 00:22:26 +0000183 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000184 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000185
showardfa8629c2008-11-04 16:51:23 +0000186 # ensure Django connection is in autocommit
187 setup_django_environment.enable_autocommit()
188
showardb18134f2009-03-20 20:52:18 +0000189 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000190 signal.signal(signal.SIGINT, handle_sigint)
191
showardd1ee1dd2009-01-07 21:33:08 +0000192 drones = global_config.global_config.get_config_value(
193 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
194 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000195 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000196 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000197 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
198
showardb18134f2009-03-20 20:52:18 +0000199 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000200
201
202def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000203 out_file = logfile
204 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000205 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000206 out_fd = open(out_file, "a", buffering=0)
207 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000208
jadmanski0afbb632008-06-06 21:10:57 +0000209 os.dup2(out_fd.fileno(), sys.stdout.fileno())
210 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000211
jadmanski0afbb632008-06-06 21:10:57 +0000212 sys.stdout = out_fd
213 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000214
215
showard87ba02a2009-04-20 19:37:32 +0000216def _autoserv_command_line(machines, results_dir, extra_args, job=None,
217 queue_entry=None):
218 autoserv_argv = [_autoserv_path, '-p', '-m', machines,
219 '-r', _drone_manager.absolute_path(results_dir)]
220 if job or queue_entry:
221 if not job:
222 job = queue_entry.job
223 autoserv_argv += ['-u', job.owner, '-l', job.name]
224 return autoserv_argv + extra_args
225
226
showard89f84db2009-03-12 20:39:13 +0000227class SchedulerError(Exception):
228 """Raised by HostScheduler when an inconsistent state occurs."""
229
230
showard63a34772008-08-18 19:32:50 +0000231class HostScheduler(object):
232 def _get_ready_hosts(self):
233 # avoid any host with a currently active queue entry against it
234 hosts = Host.fetch(
235 joins='LEFT JOIN host_queue_entries AS active_hqe '
236 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000237 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000238 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000239 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000240 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
241 return dict((host.id, host) for host in hosts)
242
243
244 @staticmethod
245 def _get_sql_id_list(id_list):
246 return ','.join(str(item_id) for item_id in id_list)
247
248
249 @classmethod
showard989f25d2008-10-01 11:38:11 +0000250 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000251 if not id_list:
252 return {}
showard63a34772008-08-18 19:32:50 +0000253 query %= cls._get_sql_id_list(id_list)
254 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000255 return cls._process_many2many_dict(rows, flip)
256
257
258 @staticmethod
259 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000260 result = {}
261 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000262 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000263 if flip:
264 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000265 result.setdefault(left_id, set()).add(right_id)
266 return result
267
268
269 @classmethod
270 def _get_job_acl_groups(cls, job_ids):
271 query = """
showardd9ac4452009-02-07 02:04:37 +0000272 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000273 FROM jobs
274 INNER JOIN users ON users.login = jobs.owner
275 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
276 WHERE jobs.id IN (%s)
277 """
278 return cls._get_many2many_dict(query, job_ids)
279
280
281 @classmethod
282 def _get_job_ineligible_hosts(cls, job_ids):
283 query = """
284 SELECT job_id, host_id
285 FROM ineligible_host_queues
286 WHERE job_id IN (%s)
287 """
288 return cls._get_many2many_dict(query, job_ids)
289
290
291 @classmethod
showard989f25d2008-10-01 11:38:11 +0000292 def _get_job_dependencies(cls, job_ids):
293 query = """
294 SELECT job_id, label_id
295 FROM jobs_dependency_labels
296 WHERE job_id IN (%s)
297 """
298 return cls._get_many2many_dict(query, job_ids)
299
300
301 @classmethod
showard63a34772008-08-18 19:32:50 +0000302 def _get_host_acls(cls, host_ids):
303 query = """
showardd9ac4452009-02-07 02:04:37 +0000304 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000305 FROM acl_groups_hosts
306 WHERE host_id IN (%s)
307 """
308 return cls._get_many2many_dict(query, host_ids)
309
310
311 @classmethod
312 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000313 if not host_ids:
314 return {}, {}
showard63a34772008-08-18 19:32:50 +0000315 query = """
316 SELECT label_id, host_id
317 FROM hosts_labels
318 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000319 """ % cls._get_sql_id_list(host_ids)
320 rows = _db.execute(query)
321 labels_to_hosts = cls._process_many2many_dict(rows)
322 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
323 return labels_to_hosts, hosts_to_labels
324
325
326 @classmethod
327 def _get_labels(cls):
328 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000329
330
331 def refresh(self, pending_queue_entries):
332 self._hosts_available = self._get_ready_hosts()
333
334 relevant_jobs = [queue_entry.job_id
335 for queue_entry in pending_queue_entries]
336 self._job_acls = self._get_job_acl_groups(relevant_jobs)
337 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000338 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000339
340 host_ids = self._hosts_available.keys()
341 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000342 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
343
344 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000345
346
347 def _is_acl_accessible(self, host_id, queue_entry):
348 job_acls = self._job_acls.get(queue_entry.job_id, set())
349 host_acls = self._host_acls.get(host_id, set())
350 return len(host_acls.intersection(job_acls)) > 0
351
352
showard989f25d2008-10-01 11:38:11 +0000353 def _check_job_dependencies(self, job_dependencies, host_labels):
354 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000355 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000356
357
358 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
359 queue_entry):
showardade14e22009-01-26 22:38:32 +0000360 if not queue_entry.meta_host:
361 # bypass only_if_needed labels when a specific host is selected
362 return True
363
showard989f25d2008-10-01 11:38:11 +0000364 for label_id in host_labels:
365 label = self._labels[label_id]
366 if not label.only_if_needed:
367 # we don't care about non-only_if_needed labels
368 continue
369 if queue_entry.meta_host == label_id:
370 # if the label was requested in a metahost it's OK
371 continue
372 if label_id not in job_dependencies:
373 return False
374 return True
375
376
showard89f84db2009-03-12 20:39:13 +0000377 def _check_atomic_group_labels(self, host_labels, queue_entry):
378 """
379 Determine if the given HostQueueEntry's atomic group settings are okay
380 to schedule on a host with the given labels.
381
382 @param host_labels - A list of label ids that the host has.
383 @param queue_entry - The HostQueueEntry being considered for the host.
384
385 @returns True if atomic group settings are okay, False otherwise.
386 """
387 return (self._get_host_atomic_group_id(host_labels) ==
388 queue_entry.atomic_group_id)
389
390
391 def _get_host_atomic_group_id(self, host_labels):
392 """
393 Return the atomic group label id for a host with the given set of
394 labels if any, or None otherwise. Raises an exception if more than
395 one atomic group are found in the set of labels.
396
397 @param host_labels - A list of label ids that the host has.
398
399 @returns The id of the atomic group found on a label in host_labels
400 or None if no atomic group label is found.
401 @raises SchedulerError - If more than one atomic group label is found.
402 """
403 atomic_ids = [self._labels[label_id].atomic_group_id
404 for label_id in host_labels
405 if self._labels[label_id].atomic_group_id is not None]
406 if not atomic_ids:
407 return None
408 if len(atomic_ids) > 1:
409 raise SchedulerError('More than one atomic label on host.')
410 return atomic_ids[0]
411
412
413 def _get_atomic_group_labels(self, atomic_group_id):
414 """
415 Lookup the label ids that an atomic_group is associated with.
416
417 @param atomic_group_id - The id of the AtomicGroup to look up.
418
419 @returns A generator yeilding Label ids for this atomic group.
420 """
421 return (id for id, label in self._labels.iteritems()
422 if label.atomic_group_id == atomic_group_id
423 and not label.invalid)
424
425
426 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
427 """
428 @param group_hosts - A sequence of Host ids to test for usability
429 and eligibility against the Job associated with queue_entry.
430 @param queue_entry - The HostQueueEntry that these hosts are being
431 tested for eligibility against.
432
433 @returns A subset of group_hosts Host ids that are eligible for the
434 supplied queue_entry.
435 """
436 return set(host_id for host_id in group_hosts
437 if self._is_host_usable(host_id)
438 and self._is_host_eligible_for_job(host_id, queue_entry))
439
440
showard989f25d2008-10-01 11:38:11 +0000441 def _is_host_eligible_for_job(self, host_id, queue_entry):
442 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
443 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000444
showard89f84db2009-03-12 20:39:13 +0000445 return (self._is_acl_accessible(host_id, queue_entry) and
446 self._check_job_dependencies(job_dependencies, host_labels) and
447 self._check_only_if_needed_labels(
448 job_dependencies, host_labels, queue_entry) and
449 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000450
451
showard63a34772008-08-18 19:32:50 +0000452 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000453 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000454 return None
455 return self._hosts_available.pop(queue_entry.host_id, None)
456
457
458 def _is_host_usable(self, host_id):
459 if host_id not in self._hosts_available:
460 # host was already used during this scheduling cycle
461 return False
462 if self._hosts_available[host_id].invalid:
463 # Invalid hosts cannot be used for metahosts. They're included in
464 # the original query because they can be used by non-metahosts.
465 return False
466 return True
467
468
469 def _schedule_metahost(self, queue_entry):
470 label_id = queue_entry.meta_host
471 hosts_in_label = self._label_hosts.get(label_id, set())
472 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
473 set())
474
475 # must iterate over a copy so we can mutate the original while iterating
476 for host_id in list(hosts_in_label):
477 if not self._is_host_usable(host_id):
478 hosts_in_label.remove(host_id)
479 continue
480 if host_id in ineligible_host_ids:
481 continue
showard989f25d2008-10-01 11:38:11 +0000482 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000483 continue
484
showard89f84db2009-03-12 20:39:13 +0000485 # Remove the host from our cached internal state before returning
486 # the host object.
showard63a34772008-08-18 19:32:50 +0000487 hosts_in_label.remove(host_id)
488 return self._hosts_available.pop(host_id)
489 return None
490
491
492 def find_eligible_host(self, queue_entry):
493 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000494 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000495 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000496 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000497 return self._schedule_metahost(queue_entry)
498
499
showard89f84db2009-03-12 20:39:13 +0000500 def find_eligible_atomic_group(self, queue_entry):
501 """
502 Given an atomic group host queue entry, locate an appropriate group
503 of hosts for the associated job to run on.
504
505 The caller is responsible for creating new HQEs for the additional
506 hosts returned in order to run the actual job on them.
507
508 @returns A list of Host instances in a ready state to satisfy this
509 atomic group scheduling. Hosts will all belong to the same
510 atomic group label as specified by the queue_entry.
511 An empty list will be returned if no suitable atomic
512 group could be found.
513
514 TODO(gps): what is responsible for kicking off any attempted repairs on
515 a group of hosts? not this function, but something needs to. We do
516 not communicate that reason for returning [] outside of here...
517 For now, we'll just be unschedulable if enough hosts within one group
518 enter Repair Failed state.
519 """
520 assert queue_entry.atomic_group_id is not None
521 job = queue_entry.job
522 assert job.synch_count and job.synch_count > 0
523 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
524 if job.synch_count > atomic_group.max_number_of_machines:
525 # Such a Job and HostQueueEntry should never be possible to
526 # create using the frontend. Regardless, we can't process it.
527 # Abort it immediately and log an error on the scheduler.
528 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000529 logging.error(
530 'Error: job %d synch_count=%d > requested atomic_group %d '
531 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
532 job.id, job.synch_count, atomic_group.id,
533 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000534 return []
535 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
536 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
537 set())
538
539 # Look in each label associated with atomic_group until we find one with
540 # enough hosts to satisfy the job.
541 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
542 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
543 if queue_entry.meta_host is not None:
544 # If we have a metahost label, only allow its hosts.
545 group_hosts.intersection_update(hosts_in_label)
546 group_hosts -= ineligible_host_ids
547 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
548 group_hosts, queue_entry)
549
550 # Job.synch_count is treated as "minimum synch count" when
551 # scheduling for an atomic group of hosts. The atomic group
552 # number of machines is the maximum to pick out of a single
553 # atomic group label for scheduling at one time.
554 min_hosts = job.synch_count
555 max_hosts = atomic_group.max_number_of_machines
556
557 if len(eligible_hosts_in_group) < min_hosts:
558 # Not enough eligible hosts in this atomic group label.
559 continue
560
561 # Limit ourselves to scheduling the atomic group size.
562 if len(eligible_hosts_in_group) > max_hosts:
563 eligible_hosts_in_group = random.sample(
564 eligible_hosts_in_group, max_hosts)
565
566 # Remove the selected hosts from our cached internal state
567 # of available hosts in order to return the Host objects.
568 host_list = []
569 for host_id in eligible_hosts_in_group:
570 hosts_in_label.discard(host_id)
571 host_list.append(self._hosts_available.pop(host_id))
572 return host_list
573
574 return []
575
576
showard170873e2009-01-07 00:22:26 +0000577class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000578 def __init__(self):
579 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000580 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000581 self._host_scheduler = HostScheduler()
mblighf3294cc2009-04-08 21:17:38 +0000582 user_cleanup_time = scheduler_config.config.clean_interval
583 self._periodic_cleanup = monitor_db_cleanup.UserCleanup(
584 _db, user_cleanup_time)
585 self._24hr_upkeep = monitor_db_cleanup.TwentyFourHourUpkeep(_db)
showard170873e2009-01-07 00:22:26 +0000586 self._host_agents = {}
587 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000588
mbligh36768f02008-02-22 18:28:33 +0000589
showard915958d2009-04-22 21:00:58 +0000590 def initialize(self, recover_hosts=True):
591 self._periodic_cleanup.initialize()
592 self._24hr_upkeep.initialize()
593
jadmanski0afbb632008-06-06 21:10:57 +0000594 # always recover processes
595 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000596
jadmanski0afbb632008-06-06 21:10:57 +0000597 if recover_hosts:
598 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000599
600
jadmanski0afbb632008-06-06 21:10:57 +0000601 def tick(self):
showard170873e2009-01-07 00:22:26 +0000602 _drone_manager.refresh()
mblighf3294cc2009-04-08 21:17:38 +0000603 self._run_cleanup()
jadmanski0afbb632008-06-06 21:10:57 +0000604 self._find_aborting()
605 self._schedule_new_jobs()
606 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000607 _drone_manager.execute_actions()
608 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000609
showard97aed502008-11-04 02:01:24 +0000610
mblighf3294cc2009-04-08 21:17:38 +0000611 def _run_cleanup(self):
612 self._periodic_cleanup.run_cleanup_maybe()
613 self._24hr_upkeep.run_cleanup_maybe()
showarda3ab0d52008-11-03 19:03:47 +0000614
mbligh36768f02008-02-22 18:28:33 +0000615
showard170873e2009-01-07 00:22:26 +0000616 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
617 for object_id in object_ids:
618 agent_dict.setdefault(object_id, set()).add(agent)
619
620
621 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
622 for object_id in object_ids:
623 assert object_id in agent_dict
624 agent_dict[object_id].remove(agent)
625
626
jadmanski0afbb632008-06-06 21:10:57 +0000627 def add_agent(self, agent):
628 self._agents.append(agent)
629 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000630 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
631 self._register_agent_for_ids(self._queue_entry_agents,
632 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000633
showard170873e2009-01-07 00:22:26 +0000634
635 def get_agents_for_entry(self, queue_entry):
636 """
637 Find agents corresponding to the specified queue_entry.
638 """
showardd3dc1992009-04-22 21:01:40 +0000639 return list(self._queue_entry_agents.get(queue_entry.id, set()))
showard170873e2009-01-07 00:22:26 +0000640
641
642 def host_has_agent(self, host):
643 """
644 Determine if there is currently an Agent present using this host.
645 """
646 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000647
648
jadmanski0afbb632008-06-06 21:10:57 +0000649 def remove_agent(self, agent):
650 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000651 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
652 agent)
653 self._unregister_agent_for_ids(self._queue_entry_agents,
654 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000655
656
jadmanski0afbb632008-06-06 21:10:57 +0000657 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000658 self._register_pidfiles()
659 _drone_manager.refresh()
showardd3dc1992009-04-22 21:01:40 +0000660 self._recover_all_recoverable_entries()
showard170873e2009-01-07 00:22:26 +0000661 self._requeue_other_active_entries()
showard170873e2009-01-07 00:22:26 +0000662 self._reverify_remaining_hosts()
663 # reinitialize drones after killing orphaned processes, since they can
664 # leave around files when they die
665 _drone_manager.execute_actions()
666 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000667
showard170873e2009-01-07 00:22:26 +0000668
669 def _register_pidfiles(self):
670 # during recovery we may need to read pidfiles for both running and
671 # parsing entries
672 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000673 where="status IN ('Running', 'Gathering', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000674 for queue_entry in queue_entries:
showardd3dc1992009-04-22 21:01:40 +0000675 for pidfile_name in _ALL_PIDFILE_NAMES:
676 pidfile_id = _drone_manager.get_pidfile_id_from(
677 queue_entry.execution_tag(), pidfile_name=pidfile_name)
678 _drone_manager.register_pidfile(pidfile_id)
showard170873e2009-01-07 00:22:26 +0000679
680
showardd3dc1992009-04-22 21:01:40 +0000681 def _recover_entries_with_status(self, status, orphans, pidfile_name,
682 recover_entries_fn):
683 queue_entries = HostQueueEntry.fetch(where="status = '%s'" % status)
showard170873e2009-01-07 00:22:26 +0000684 for queue_entry in queue_entries:
685 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000686 # synchronous job we've already recovered
687 continue
showardd3dc1992009-04-22 21:01:40 +0000688 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showard170873e2009-01-07 00:22:26 +0000689 execution_tag = queue_entry.execution_tag()
690 run_monitor = PidfileRunMonitor()
showardd3dc1992009-04-22 21:01:40 +0000691 run_monitor.attach_to_existing_process(execution_tag,
692 pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000693 if not run_monitor.has_process():
showardd3dc1992009-04-22 21:01:40 +0000694 # execution apparently never happened
695 recover_entries_fn(queue_entry.job, queue_entries, None)
showard170873e2009-01-07 00:22:26 +0000696 continue
mbligh90a549d2008-03-25 23:52:34 +0000697
showardd3dc1992009-04-22 21:01:40 +0000698 logging.info('Recovering %s entry %s (process %s)',
699 status.lower(),
700 ', '.join(str(entry) for entry in queue_entries),
701 run_monitor.get_process())
702 recover_entries_fn(queue_entry.job, queue_entries, run_monitor)
703 orphans.discard(run_monitor.get_process())
704
705
706 def _kill_remaining_orphan_processes(self, orphans):
707 for process in orphans:
showardb18134f2009-03-20 20:52:18 +0000708 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000709 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000710
showard170873e2009-01-07 00:22:26 +0000711
showardd3dc1992009-04-22 21:01:40 +0000712 def _recover_running_entries(self, orphans):
713 def recover_entries(job, queue_entries, run_monitor):
714 if run_monitor is not None:
715 queue_task = RecoveryQueueTask(job=job,
716 queue_entries=queue_entries,
717 run_monitor=run_monitor)
718 self.add_agent(Agent(tasks=[queue_task],
719 num_processes=len(queue_entries)))
720 # else, _requeue_other_active_entries will cover this
721
722 self._recover_entries_with_status(models.HostQueueEntry.Status.RUNNING,
723 orphans, '.autoserv_execute',
724 recover_entries)
725
726
727 def _recover_gathering_entries(self, orphans):
728 def recover_entries(job, queue_entries, run_monitor):
729 gather_task = GatherLogsTask(job, queue_entries,
730 run_monitor=run_monitor)
731 self.add_agent(Agent([gather_task]))
732
733 self._recover_entries_with_status(
734 models.HostQueueEntry.Status.GATHERING,
735 orphans, _CRASHINFO_PID_FILE, recover_entries)
736
737
738 def _recover_parsing_entries(self, orphans):
739 def recover_entries(job, queue_entries, run_monitor):
740 reparse_task = FinalReparseTask(queue_entries,
741 run_monitor=run_monitor)
742 self.add_agent(Agent([reparse_task], num_processes=0))
743
744 self._recover_entries_with_status(models.HostQueueEntry.Status.PARSING,
745 orphans, _PARSER_PID_FILE,
746 recover_entries)
747
748
749 def _recover_all_recoverable_entries(self):
750 orphans = _drone_manager.get_orphaned_autoserv_processes()
751 self._recover_running_entries(orphans)
752 self._recover_gathering_entries(orphans)
753 self._recover_parsing_entries(orphans)
754 self._kill_remaining_orphan_processes(orphans)
jadmanski0afbb632008-06-06 21:10:57 +0000755
showard97aed502008-11-04 02:01:24 +0000756
showard170873e2009-01-07 00:22:26 +0000757 def _requeue_other_active_entries(self):
758 queue_entries = HostQueueEntry.fetch(
showardd3dc1992009-04-22 21:01:40 +0000759 where='active AND NOT complete AND '
760 '(aborted OR status != "Pending")')
showard170873e2009-01-07 00:22:26 +0000761 for queue_entry in queue_entries:
762 if self.get_agents_for_entry(queue_entry):
763 # entry has already been recovered
764 continue
showardd3dc1992009-04-22 21:01:40 +0000765 if queue_entry.aborted:
766 queue_entry.abort(self)
767 continue
768
769 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
showardb18134f2009-03-20 20:52:18 +0000770 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000771 if queue_entry.host:
772 tasks = queue_entry.host.reverify_tasks()
773 self.add_agent(Agent(tasks))
774 agent = queue_entry.requeue()
775
776
777 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000778 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000779 self._reverify_hosts_where("""(status = 'Repairing' OR
780 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000781 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000782
showard170873e2009-01-07 00:22:26 +0000783 # recover "Running" hosts with no active queue entries, although this
784 # should never happen
785 message = ('Recovering running host %s - this probably indicates a '
786 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000787 self._reverify_hosts_where("""status = 'Running' AND
788 id NOT IN (SELECT host_id
789 FROM host_queue_entries
790 WHERE active)""",
791 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000792
793
jadmanski0afbb632008-06-06 21:10:57 +0000794 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000795 print_message='Reverifying host %s'):
796 full_where='locked = 0 AND invalid = 0 AND ' + where
797 for host in Host.fetch(where=full_where):
798 if self.host_has_agent(host):
799 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000800 continue
showard170873e2009-01-07 00:22:26 +0000801 if print_message:
showardb18134f2009-03-20 20:52:18 +0000802 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000803 tasks = host.reverify_tasks()
804 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000805
806
jadmanski0afbb632008-06-06 21:10:57 +0000807 def _recover_hosts(self):
808 # recover "Repair Failed" hosts
809 message = 'Reverifying dead host %s'
810 self._reverify_hosts_where("status = 'Repair Failed'",
811 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000812
813
showard04c82c52008-05-29 19:38:12 +0000814
showardb95b1bd2008-08-15 18:11:04 +0000815 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000816 # prioritize by job priority, then non-metahost over metahost, then FIFO
817 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000818 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000819 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000820 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000821
822
showard89f84db2009-03-12 20:39:13 +0000823 def _refresh_pending_queue_entries(self):
824 """
825 Lookup the pending HostQueueEntries and call our HostScheduler
826 refresh() method given that list. Return the list.
827
828 @returns A list of pending HostQueueEntries sorted in priority order.
829 """
showard63a34772008-08-18 19:32:50 +0000830 queue_entries = self._get_pending_queue_entries()
831 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000832 return []
showardb95b1bd2008-08-15 18:11:04 +0000833
showard63a34772008-08-18 19:32:50 +0000834 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000835
showard89f84db2009-03-12 20:39:13 +0000836 return queue_entries
837
838
839 def _schedule_atomic_group(self, queue_entry):
840 """
841 Schedule the given queue_entry on an atomic group of hosts.
842
843 Returns immediately if there are insufficient available hosts.
844
845 Creates new HostQueueEntries based off of queue_entry for the
846 scheduled hosts and starts them all running.
847 """
848 # This is a virtual host queue entry representing an entire
849 # atomic group, find a group and schedule their hosts.
850 group_hosts = self._host_scheduler.find_eligible_atomic_group(
851 queue_entry)
852 if not group_hosts:
853 return
854 # The first assigned host uses the original HostQueueEntry
855 group_queue_entries = [queue_entry]
856 for assigned_host in group_hosts[1:]:
857 # Create a new HQE for every additional assigned_host.
858 new_hqe = HostQueueEntry.clone(queue_entry)
859 new_hqe.save()
860 group_queue_entries.append(new_hqe)
861 assert len(group_queue_entries) == len(group_hosts)
862 for queue_entry, host in itertools.izip(group_queue_entries,
863 group_hosts):
864 self._run_queue_entry(queue_entry, host)
865
866
867 def _schedule_new_jobs(self):
868 queue_entries = self._refresh_pending_queue_entries()
869 if not queue_entries:
870 return
871
showard63a34772008-08-18 19:32:50 +0000872 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000873 if (queue_entry.atomic_group_id is None or
874 queue_entry.host_id is not None):
875 assigned_host = self._host_scheduler.find_eligible_host(
876 queue_entry)
877 if assigned_host:
878 self._run_queue_entry(queue_entry, assigned_host)
879 else:
880 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000881
882
883 def _run_queue_entry(self, queue_entry, host):
884 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000885 # in some cases (synchronous jobs with run_verify=False), agent may be
886 # None
showard9976ce92008-10-15 20:28:13 +0000887 if agent:
888 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000889
890
jadmanski0afbb632008-06-06 21:10:57 +0000891 def _find_aborting(self):
showardd3dc1992009-04-22 21:01:40 +0000892 for entry in HostQueueEntry.fetch(where='aborted and not complete'):
893 for agent in self.get_agents_for_entry(entry):
894 agent.abort()
895 entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000896
897
showard324bf812009-01-20 23:23:38 +0000898 def _can_start_agent(self, agent, num_started_this_cycle,
899 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000900 # always allow zero-process agents to run
901 if agent.num_processes == 0:
902 return True
903 # don't allow any nonzero-process agents to run after we've reached a
904 # limit (this avoids starvation of many-process agents)
905 if have_reached_limit:
906 return False
907 # total process throttling
showard324bf812009-01-20 23:23:38 +0000908 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000909 return False
910 # if a single agent exceeds the per-cycle throttling, still allow it to
911 # run when it's the first agent in the cycle
912 if num_started_this_cycle == 0:
913 return True
914 # per-cycle throttling
915 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000916 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000917 return False
918 return True
919
920
jadmanski0afbb632008-06-06 21:10:57 +0000921 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000922 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000923 have_reached_limit = False
924 # iterate over copy, so we can remove agents during iteration
925 for agent in list(self._agents):
926 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000927 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000928 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000929 continue
930 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000931 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000932 have_reached_limit):
933 have_reached_limit = True
934 continue
showard4c5374f2008-09-04 17:02:56 +0000935 num_started_this_cycle += agent.num_processes
936 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000937 logging.info('%d running processes',
938 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000939
940
showard170873e2009-01-07 00:22:26 +0000941class PidfileRunMonitor(object):
942 """
943 Client must call either run() to start a new process or
944 attach_to_existing_process().
945 """
mbligh36768f02008-02-22 18:28:33 +0000946
showard170873e2009-01-07 00:22:26 +0000947 class _PidfileException(Exception):
948 """
949 Raised when there's some unexpected behavior with the pid file, but only
950 used internally (never allowed to escape this class).
951 """
mbligh36768f02008-02-22 18:28:33 +0000952
953
showard170873e2009-01-07 00:22:26 +0000954 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000955 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000956 self._start_time = None
957 self.pidfile_id = None
958 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000959
960
showard170873e2009-01-07 00:22:26 +0000961 def _add_nice_command(self, command, nice_level):
962 if not nice_level:
963 return command
964 return ['nice', '-n', str(nice_level)] + command
965
966
967 def _set_start_time(self):
968 self._start_time = time.time()
969
970
971 def run(self, command, working_directory, nice_level=None, log_file=None,
972 pidfile_name=None, paired_with_pidfile=None):
973 assert command is not None
974 if nice_level is not None:
975 command = ['nice', '-n', str(nice_level)] + command
976 self._set_start_time()
977 self.pidfile_id = _drone_manager.execute_command(
showardd3dc1992009-04-22 21:01:40 +0000978 command, working_directory, pidfile_name=pidfile_name,
979 log_file=log_file, paired_with_pidfile=paired_with_pidfile)
showard170873e2009-01-07 00:22:26 +0000980
981
showardd3dc1992009-04-22 21:01:40 +0000982 def attach_to_existing_process(self, execution_tag,
983 pidfile_name=_AUTOSERV_PID_FILE):
showard170873e2009-01-07 00:22:26 +0000984 self._set_start_time()
showardd3dc1992009-04-22 21:01:40 +0000985 self.pidfile_id = _drone_manager.get_pidfile_id_from(
986 execution_tag, pidfile_name=pidfile_name)
showard170873e2009-01-07 00:22:26 +0000987 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +0000988
989
jadmanski0afbb632008-06-06 21:10:57 +0000990 def kill(self):
showard170873e2009-01-07 00:22:26 +0000991 if self.has_process():
992 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +0000993
mbligh36768f02008-02-22 18:28:33 +0000994
showard170873e2009-01-07 00:22:26 +0000995 def has_process(self):
showard21baa452008-10-21 00:08:39 +0000996 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +0000997 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +0000998
999
showard170873e2009-01-07 00:22:26 +00001000 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001001 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001002 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001003 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001004
1005
showard170873e2009-01-07 00:22:26 +00001006 def _read_pidfile(self, use_second_read=False):
1007 assert self.pidfile_id is not None, (
1008 'You must call run() or attach_to_existing_process()')
1009 contents = _drone_manager.get_pidfile_contents(
1010 self.pidfile_id, use_second_read=use_second_read)
1011 if contents.is_invalid():
1012 self._state = drone_manager.PidfileContents()
1013 raise self._PidfileException(contents)
1014 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001015
1016
showard21baa452008-10-21 00:08:39 +00001017 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001018 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1019 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001020 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001021 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001022 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001023
1024
1025 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001026 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001027 return
mblighbb421852008-03-11 22:36:16 +00001028
showard21baa452008-10-21 00:08:39 +00001029 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001030
showard170873e2009-01-07 00:22:26 +00001031 if self._state.process is None:
1032 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001033 return
mbligh90a549d2008-03-25 23:52:34 +00001034
showard21baa452008-10-21 00:08:39 +00001035 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001036 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001037 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001038 return
mbligh90a549d2008-03-25 23:52:34 +00001039
showard170873e2009-01-07 00:22:26 +00001040 # pid but no running process - maybe process *just* exited
1041 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001042 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001043 # autoserv exited without writing an exit code
1044 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001045 self._handle_pidfile_error(
1046 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001047
showard21baa452008-10-21 00:08:39 +00001048
1049 def _get_pidfile_info(self):
1050 """\
1051 After completion, self._state will contain:
1052 pid=None, exit_status=None if autoserv has not yet run
1053 pid!=None, exit_status=None if autoserv is running
1054 pid!=None, exit_status!=None if autoserv has completed
1055 """
1056 try:
1057 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001058 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001059 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001060
1061
showard170873e2009-01-07 00:22:26 +00001062 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001063 """\
1064 Called when no pidfile is found or no pid is in the pidfile.
1065 """
showard170873e2009-01-07 00:22:26 +00001066 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001067 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001068 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1069 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001070 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001071 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001072
1073
showard35162b02009-03-03 02:17:30 +00001074 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001075 """\
1076 Called when autoserv has exited without writing an exit status,
1077 or we've timed out waiting for autoserv to write a pid to the
1078 pidfile. In either case, we just return failure and the caller
1079 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001080
showard170873e2009-01-07 00:22:26 +00001081 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001082 """
1083 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001084 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001085 self._state.exit_status = 1
1086 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001087
1088
jadmanski0afbb632008-06-06 21:10:57 +00001089 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001090 self._get_pidfile_info()
1091 return self._state.exit_status
1092
1093
1094 def num_tests_failed(self):
1095 self._get_pidfile_info()
1096 assert self._state.num_tests_failed is not None
1097 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001098
1099
mbligh36768f02008-02-22 18:28:33 +00001100class Agent(object):
showard170873e2009-01-07 00:22:26 +00001101 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001102 self.active_task = None
showardd3dc1992009-04-22 21:01:40 +00001103 self.queue = None
jadmanski0afbb632008-06-06 21:10:57 +00001104 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001105 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001106
showard170873e2009-01-07 00:22:26 +00001107 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1108 for task in tasks)
1109 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1110
showardd3dc1992009-04-22 21:01:40 +00001111 self._clear_queue()
jadmanski0afbb632008-06-06 21:10:57 +00001112 for task in tasks:
1113 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001114
1115
showardd3dc1992009-04-22 21:01:40 +00001116 def _clear_queue(self):
1117 self.queue = Queue.Queue(0)
1118
1119
showard170873e2009-01-07 00:22:26 +00001120 def _union_ids(self, id_lists):
1121 return set(itertools.chain(*id_lists))
1122
1123
jadmanski0afbb632008-06-06 21:10:57 +00001124 def add_task(self, task):
1125 self.queue.put_nowait(task)
1126 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001127
1128
jadmanski0afbb632008-06-06 21:10:57 +00001129 def tick(self):
showard21baa452008-10-21 00:08:39 +00001130 while not self.is_done():
1131 if self.active_task and not self.active_task.is_done():
1132 self.active_task.poll()
1133 if not self.active_task.is_done():
1134 return
1135 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001136
1137
jadmanski0afbb632008-06-06 21:10:57 +00001138 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001139 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001140 if self.active_task:
1141 assert self.active_task.is_done()
jadmanski0afbb632008-06-06 21:10:57 +00001142 if not self.active_task.success:
1143 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001144
jadmanski0afbb632008-06-06 21:10:57 +00001145 self.active_task = None
1146 if not self.is_done():
1147 self.active_task = self.queue.get_nowait()
1148 if self.active_task:
1149 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001150
1151
jadmanski0afbb632008-06-06 21:10:57 +00001152 def on_task_failure(self):
showardd3dc1992009-04-22 21:01:40 +00001153 self._clear_queue()
showardccbd6c52009-03-21 00:10:21 +00001154 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1155 # get reset.
1156 new_agent = Agent(self.active_task.failure_tasks)
1157 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001158
mblighe2586682008-02-29 22:45:46 +00001159
showard4c5374f2008-09-04 17:02:56 +00001160 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001161 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001162
1163
jadmanski0afbb632008-06-06 21:10:57 +00001164 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001165 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001166
1167
jadmanski0afbb632008-06-06 21:10:57 +00001168 def start(self):
1169 assert self.dispatcher
jadmanski0afbb632008-06-06 21:10:57 +00001170 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001171
jadmanski0afbb632008-06-06 21:10:57 +00001172
showardd3dc1992009-04-22 21:01:40 +00001173 def abort(self):
1174 if self.active_task:
1175 self.active_task.abort()
1176 self.active_task = None
1177 self._clear_queue()
1178
1179
mbligh36768f02008-02-22 18:28:33 +00001180class AgentTask(object):
showardd3dc1992009-04-22 21:01:40 +00001181 def __init__(self, cmd, working_directory=None, failure_tasks=[],
1182 pidfile_name=None, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001183 self.done = False
1184 self.failure_tasks = failure_tasks
1185 self.started = False
1186 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001187 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001188 self.task = None
1189 self.agent = None
1190 self.monitor = None
1191 self.success = None
showardd3dc1992009-04-22 21:01:40 +00001192 self.aborted = False
showard170873e2009-01-07 00:22:26 +00001193 self.queue_entry_ids = []
1194 self.host_ids = []
1195 self.log_file = None
1196
1197
1198 def _set_ids(self, host=None, queue_entries=None):
1199 if queue_entries and queue_entries != [None]:
1200 self.host_ids = [entry.host.id for entry in queue_entries]
1201 self.queue_entry_ids = [entry.id for entry in queue_entries]
1202 else:
1203 assert host
1204 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001205
1206
jadmanski0afbb632008-06-06 21:10:57 +00001207 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001208 if self.monitor:
1209 self.tick(self.monitor.exit_code())
1210 else:
1211 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001212
1213
jadmanski0afbb632008-06-06 21:10:57 +00001214 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001215 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001216 return
jadmanski0afbb632008-06-06 21:10:57 +00001217 if exit_code == 0:
1218 success = True
1219 else:
1220 success = False
mbligh36768f02008-02-22 18:28:33 +00001221
jadmanski0afbb632008-06-06 21:10:57 +00001222 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001223
1224
jadmanski0afbb632008-06-06 21:10:57 +00001225 def is_done(self):
1226 return self.done
mbligh36768f02008-02-22 18:28:33 +00001227
1228
jadmanski0afbb632008-06-06 21:10:57 +00001229 def finished(self, success):
1230 self.done = True
1231 self.success = success
1232 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001233
1234
jadmanski0afbb632008-06-06 21:10:57 +00001235 def prolog(self):
1236 pass
mblighd64e5702008-04-04 21:39:28 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001240 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001241
mbligh36768f02008-02-22 18:28:33 +00001242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001244 if self.monitor and self.log_file:
1245 _drone_manager.copy_to_results_repository(
1246 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001247
1248
jadmanski0afbb632008-06-06 21:10:57 +00001249 def epilog(self):
1250 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001251
1252
jadmanski0afbb632008-06-06 21:10:57 +00001253 def start(self):
1254 assert self.agent
1255
1256 if not self.started:
1257 self.prolog()
1258 self.run()
1259
1260 self.started = True
1261
1262
1263 def abort(self):
1264 if self.monitor:
1265 self.monitor.kill()
1266 self.done = True
showardd3dc1992009-04-22 21:01:40 +00001267 self.aborted = True
jadmanski0afbb632008-06-06 21:10:57 +00001268 self.cleanup()
1269
1270
showard170873e2009-01-07 00:22:26 +00001271 def set_host_log_file(self, base_name, host):
1272 filename = '%s.%s' % (time.time(), base_name)
1273 self.log_file = os.path.join('hosts', host.hostname, filename)
1274
1275
showardde634ee2009-01-30 01:44:24 +00001276 def _get_consistent_execution_tag(self, queue_entries):
1277 first_execution_tag = queue_entries[0].execution_tag()
1278 for queue_entry in queue_entries[1:]:
1279 assert queue_entry.execution_tag() == first_execution_tag, (
1280 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1281 queue_entry,
1282 first_execution_tag,
1283 queue_entries[0]))
1284 return first_execution_tag
1285
1286
showard678df4f2009-02-04 21:36:39 +00001287 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001288 assert len(queue_entries) > 0
1289 assert self.monitor
1290 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001291 results_path = execution_tag + '/'
1292 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1293 results_path)
showardde634ee2009-01-30 01:44:24 +00001294
1295 reparse_task = FinalReparseTask(queue_entries)
1296 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1297
1298
showardd3dc1992009-04-22 21:01:40 +00001299 def run(self, pidfile_name=_AUTOSERV_PID_FILE, paired_with_pidfile=None):
jadmanski0afbb632008-06-06 21:10:57 +00001300 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001301 self.monitor = PidfileRunMonitor()
1302 self.monitor.run(self.cmd, self._working_directory,
1303 nice_level=AUTOSERV_NICE_LEVEL,
showardd3dc1992009-04-22 21:01:40 +00001304 log_file=self.log_file,
1305 pidfile_name=pidfile_name,
1306 paired_with_pidfile=paired_with_pidfile)
mbligh36768f02008-02-22 18:28:33 +00001307
1308
1309class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001310 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001311 """\
showard170873e2009-01-07 00:22:26 +00001312 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001313 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001314 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001315 # normalize the protection name
1316 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001317
jadmanski0afbb632008-06-06 21:10:57 +00001318 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001319 self.queue_entry_to_fail = queue_entry
1320 # *don't* include the queue entry in IDs -- if the queue entry is
1321 # aborted, we want to leave the repair task running
1322 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001323
1324 self.create_temp_resultsdir('.repair')
showard87ba02a2009-04-20 19:37:32 +00001325 cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1326 ['-R', '--host-protection', protection],
1327 queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001328 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1329
showard170873e2009-01-07 00:22:26 +00001330 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001331
mbligh36768f02008-02-22 18:28:33 +00001332
jadmanski0afbb632008-06-06 21:10:57 +00001333 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001334 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001335 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001336 if self.queue_entry_to_fail:
1337 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001338
1339
showardde634ee2009-01-30 01:44:24 +00001340 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001341 assert self.queue_entry_to_fail
1342
1343 if self.queue_entry_to_fail.meta_host:
1344 return # don't fail metahost entries, they'll be reassigned
1345
1346 self.queue_entry_to_fail.update_from_database()
1347 if self.queue_entry_to_fail.status != 'Queued':
1348 return # entry has been aborted
1349
1350 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001351 # copy results logs into the normal place for job results
1352 _drone_manager.copy_results_on_drone(
1353 self.monitor.get_process(),
1354 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001355 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001356
showardccbd6c52009-03-21 00:10:21 +00001357 self._copy_and_parse_results([self.queue_entry_to_fail])
1358 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001359
1360
jadmanski0afbb632008-06-06 21:10:57 +00001361 def epilog(self):
1362 super(RepairTask, self).epilog()
1363 if self.success:
1364 self.host.set_status('Ready')
1365 else:
1366 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001367 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001368 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001369
1370
showard8fe93b52008-11-18 17:53:22 +00001371class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001372 def epilog(self):
1373 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001374 should_copy_results = (self.queue_entry and not self.success
1375 and not self.queue_entry.meta_host)
1376 if should_copy_results:
1377 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001378 destination = os.path.join(self.queue_entry.execution_tag(),
1379 os.path.basename(self.log_file))
1380 _drone_manager.copy_to_results_repository(
1381 self.monitor.get_process(), self.log_file,
1382 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001383
1384
1385class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001386 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001387 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001388 self.host = host or queue_entry.host
1389 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001390
jadmanski0afbb632008-06-06 21:10:57 +00001391 self.create_temp_resultsdir('.verify')
showard87ba02a2009-04-20 19:37:32 +00001392 cmd = _autoserv_command_line(self.host.hostname, self.temp_results_dir,
1393 ['-v'], queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001394 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001395 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1396 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001397
showard170873e2009-01-07 00:22:26 +00001398 self.set_host_log_file('verify', self.host)
1399 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001400
1401
jadmanski0afbb632008-06-06 21:10:57 +00001402 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001403 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001404 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001405 if self.queue_entry:
1406 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001407 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001408
1409
jadmanski0afbb632008-06-06 21:10:57 +00001410 def epilog(self):
1411 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001412
jadmanski0afbb632008-06-06 21:10:57 +00001413 if self.success:
1414 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001415
1416
mbligh36768f02008-02-22 18:28:33 +00001417class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001418 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001419 self.job = job
1420 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001421 super(QueueTask, self).__init__(cmd, self._execution_tag())
1422 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001423
1424
showard170873e2009-01-07 00:22:26 +00001425 def _format_keyval(self, key, value):
1426 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001427
1428
showard73ec0442009-02-07 02:05:20 +00001429 def _keyval_path(self):
1430 return os.path.join(self._execution_tag(), 'keyval')
1431
1432
1433 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1434 keyval_contents = '\n'.join(self._format_keyval(key, value)
1435 for key, value in keyval_dict.iteritems())
1436 # always end with a newline to allow additional keyvals to be written
1437 keyval_contents += '\n'
1438 _drone_manager.attach_file_to_execution(self._execution_tag(),
1439 keyval_contents,
1440 file_path=keyval_path)
1441
1442
1443 def _write_keyvals_before_job(self, keyval_dict):
1444 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1445
1446
1447 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001448 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001449 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001450 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001451 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001452
1453
showard170873e2009-01-07 00:22:26 +00001454 def _write_host_keyvals(self, host):
1455 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1456 host.hostname)
1457 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001458 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1459 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001460
1461
showard170873e2009-01-07 00:22:26 +00001462 def _execution_tag(self):
1463 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001464
1465
jadmanski0afbb632008-06-06 21:10:57 +00001466 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001467 queued = int(time.mktime(self.job.created_on.timetuple()))
1468 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001469 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001470 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001471 queue_entry.set_status('Running')
1472 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001473 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001474 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001475 assert len(self.queue_entries) == 1
1476 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001477
1478
showard35162b02009-03-03 02:17:30 +00001479 def _write_lost_process_error_file(self):
1480 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1481 _drone_manager.write_lines_to_file(error_file_path,
1482 [_LOST_PROCESS_ERROR])
1483
1484
showardd3dc1992009-04-22 21:01:40 +00001485 def _finish_task(self):
1486 # both of these conditionals can be true, iff the process ran, wrote a
1487 # pid to its pidfile, and then exited without writing an exit code
showard35162b02009-03-03 02:17:30 +00001488 if self.monitor.has_process():
1489 self._write_keyval_after_job("job_finished", int(time.time()))
showardd3dc1992009-04-22 21:01:40 +00001490 gather_task = GatherLogsTask(self.job, self.queue_entries)
1491 self.agent.dispatcher.add_agent(Agent(tasks=[gather_task]))
showard35162b02009-03-03 02:17:30 +00001492
1493 if self.monitor.lost_process:
1494 self._write_lost_process_error_file()
1495 for queue_entry in self.queue_entries:
1496 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001497
1498
showardcbd74612008-11-19 21:42:02 +00001499 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001500 _drone_manager.write_lines_to_file(
1501 os.path.join(self._execution_tag(), 'status.log'),
1502 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001503 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001504
1505
jadmanskif7fa2cc2008-10-01 14:13:23 +00001506 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001507 if not self.monitor or not self.monitor.has_process():
1508 return
1509
jadmanskif7fa2cc2008-10-01 14:13:23 +00001510 # build up sets of all the aborted_by and aborted_on values
1511 aborted_by, aborted_on = set(), set()
1512 for queue_entry in self.queue_entries:
1513 if queue_entry.aborted_by:
1514 aborted_by.add(queue_entry.aborted_by)
1515 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1516 aborted_on.add(t)
1517
1518 # extract some actual, unique aborted by value and write it out
1519 assert len(aborted_by) <= 1
1520 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001521 aborted_by_value = aborted_by.pop()
1522 aborted_on_value = max(aborted_on)
1523 else:
1524 aborted_by_value = 'autotest_system'
1525 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001526
showarda0382352009-02-11 23:36:43 +00001527 self._write_keyval_after_job("aborted_by", aborted_by_value)
1528 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001529
showardcbd74612008-11-19 21:42:02 +00001530 aborted_on_string = str(datetime.datetime.fromtimestamp(
1531 aborted_on_value))
1532 self._write_status_comment('Job aborted by %s on %s' %
1533 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001534
1535
jadmanski0afbb632008-06-06 21:10:57 +00001536 def abort(self):
1537 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001538 self._log_abort()
showardd3dc1992009-04-22 21:01:40 +00001539 self._finish_task()
showard21baa452008-10-21 00:08:39 +00001540
1541
jadmanski0afbb632008-06-06 21:10:57 +00001542 def epilog(self):
1543 super(QueueTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001544 self._finish_task()
1545 logging.info("queue_task finished with success=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001546
1547
mblighbb421852008-03-11 22:36:16 +00001548class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001549 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001550 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001551 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001552
1553
jadmanski0afbb632008-06-06 21:10:57 +00001554 def run(self):
1555 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001556
1557
jadmanski0afbb632008-06-06 21:10:57 +00001558 def prolog(self):
1559 # recovering an existing process - don't do prolog
1560 pass
mblighbb421852008-03-11 22:36:16 +00001561
1562
showardd3dc1992009-04-22 21:01:40 +00001563class PostJobTask(AgentTask):
1564 def __init__(self, queue_entries, pidfile_name, logfile_name,
1565 run_monitor=None):
1566 """
1567 If run_monitor != None, we're recovering a running task.
1568 """
1569 self._queue_entries = queue_entries
1570 self._pidfile_name = pidfile_name
1571 self._run_monitor = run_monitor
1572
1573 self._execution_tag = self._get_consistent_execution_tag(queue_entries)
1574 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1575 self._autoserv_monitor = PidfileRunMonitor()
1576 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1577 self._paired_with_pidfile = self._autoserv_monitor.pidfile_id
1578
1579 if _testing_mode:
1580 command = 'true'
1581 else:
1582 command = self._generate_command(self._results_dir)
1583
1584 super(PostJobTask, self).__init__(cmd=command,
1585 working_directory=self._execution_tag)
1586
1587 self.log_file = os.path.join(self._execution_tag, logfile_name)
1588 self._final_status = self._determine_final_status()
1589
1590
1591 def _generate_command(self, results_dir):
1592 raise NotImplementedError('Subclasses must override this')
1593
1594
1595 def _job_was_aborted(self):
1596 was_aborted = None
1597 for queue_entry in self._queue_entries:
1598 queue_entry.update_from_database()
1599 if was_aborted is None: # first queue entry
1600 was_aborted = bool(queue_entry.aborted)
1601 elif was_aborted != bool(queue_entry.aborted): # subsequent entries
1602 email_manager.manager.enqueue_notify_email(
1603 'Inconsistent abort state',
1604 'Queue entries have inconsistent abort state: ' +
1605 ', '.join('%s (%s)' % (queue_entry, queue_entry.aborted)))
1606 # don't crash here, just assume true
1607 return True
1608 return was_aborted
1609
1610
1611 def _determine_final_status(self):
1612 if self._job_was_aborted():
1613 return models.HostQueueEntry.Status.ABORTED
1614
1615 # we'll use a PidfileRunMonitor to read the autoserv exit status
1616 if self._autoserv_monitor.exit_code() == 0:
1617 return models.HostQueueEntry.Status.COMPLETED
1618 return models.HostQueueEntry.Status.FAILED
1619
1620
1621 def run(self):
1622 if self._run_monitor is not None:
1623 self.monitor = self._run_monitor
1624 else:
1625 # make sure we actually have results to work with.
1626 # this should never happen in normal operation.
1627 if not self._autoserv_monitor.has_process():
1628 email_manager.manager.enqueue_notify_email(
1629 'No results in post-job task',
1630 'No results in post-job task at %s' %
1631 self._autoserv_monitor.pidfile_id)
1632 self.finished(False)
1633 return
1634
1635 super(PostJobTask, self).run(
1636 pidfile_name=self._pidfile_name,
1637 paired_with_pidfile=self._paired_with_pidfile)
1638
1639
1640 def _set_all_statuses(self, status):
1641 for queue_entry in self._queue_entries:
1642 queue_entry.set_status(status)
1643
1644
1645 def abort(self):
1646 # override AgentTask.abort() to avoid killing the process and ending
1647 # the task. post-job tasks continue when the job is aborted.
1648 pass
1649
1650
1651class GatherLogsTask(PostJobTask):
1652 """
1653 Task responsible for
1654 * gathering uncollected logs (if Autoserv crashed hard or was killed)
1655 * copying logs to the results repository
1656 * spawning CleanupTasks for hosts, if necessary
1657 * spawning a FinalReparseTask for the job
1658 """
1659 def __init__(self, job, queue_entries, run_monitor=None):
1660 self._job = job
1661 super(GatherLogsTask, self).__init__(
1662 queue_entries, pidfile_name=_CRASHINFO_PID_FILE,
1663 logfile_name='.collect_crashinfo.log', run_monitor=run_monitor)
1664 self._set_ids(queue_entries=queue_entries)
1665
1666
1667 def _generate_command(self, results_dir):
1668 host_list = ','.join(queue_entry.host.hostname
1669 for queue_entry in self._queue_entries)
1670 return [_autoserv_path , '-p', '--collect-crashinfo', '-m', host_list,
1671 '-r', results_dir]
1672
1673
1674 def prolog(self):
1675 super(GatherLogsTask, self).prolog()
1676 self._set_all_statuses(models.HostQueueEntry.Status.GATHERING)
1677
1678
1679 def _reboot_hosts(self):
1680 reboot_after = self._job.reboot_after
1681 do_reboot = False
1682 if reboot_after == models.RebootAfter.ALWAYS:
1683 do_reboot = True
1684 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
1685 final_success = (
1686 self._final_status == models.HostQueueEntry.Status.COMPLETED)
1687 num_tests_failed = self._autoserv_monitor.num_tests_failed()
1688 do_reboot = (final_success and num_tests_failed == 0)
1689
1690 for queue_entry in self._queue_entries:
1691 if do_reboot:
1692 # don't pass the queue entry to the CleanupTask. if the cleanup
1693 # fails, the job doesn't care -- it's over.
1694 cleanup_task = CleanupTask(host=queue_entry.host)
1695 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
1696 else:
1697 queue_entry.host.set_status('Ready')
1698
1699
1700 def epilog(self):
1701 super(GatherLogsTask, self).epilog()
1702 self._copy_and_parse_results(self._queue_entries)
1703 self._reboot_hosts()
1704
1705
showard8fe93b52008-11-18 17:53:22 +00001706class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001707 def __init__(self, host=None, queue_entry=None):
1708 assert bool(host) ^ bool(queue_entry)
1709 if queue_entry:
1710 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001711 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001712 self.host = host
showard170873e2009-01-07 00:22:26 +00001713
1714 self.create_temp_resultsdir('.cleanup')
showard87ba02a2009-04-20 19:37:32 +00001715 self.cmd = _autoserv_command_line(host.hostname, self.temp_results_dir,
1716 ['--cleanup'],
1717 queue_entry=queue_entry)
showarde788ea62008-11-17 21:02:47 +00001718 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001719 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1720 failure_tasks=[repair_task])
1721
1722 self._set_ids(host=host, queue_entries=[queue_entry])
1723 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001724
mblighd5c95802008-03-05 00:33:46 +00001725
jadmanski0afbb632008-06-06 21:10:57 +00001726 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001727 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001728 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001729 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001730
mblighd5c95802008-03-05 00:33:46 +00001731
showard21baa452008-10-21 00:08:39 +00001732 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001733 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001734 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001735 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001736 self.host.update_field('dirty', 0)
1737
1738
showardd3dc1992009-04-22 21:01:40 +00001739class FinalReparseTask(PostJobTask):
showard97aed502008-11-04 02:01:24 +00001740 _num_running_parses = 0
1741
showardd3dc1992009-04-22 21:01:40 +00001742 def __init__(self, queue_entries, run_monitor=None):
1743 super(FinalReparseTask, self).__init__(queue_entries,
1744 pidfile_name=_PARSER_PID_FILE,
1745 logfile_name='.parse.log',
1746 run_monitor=run_monitor)
showard170873e2009-01-07 00:22:26 +00001747 # don't use _set_ids, since we don't want to set the host_ids
1748 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001749 self._parse_started = False
1750
showard97aed502008-11-04 02:01:24 +00001751
1752 @classmethod
1753 def _increment_running_parses(cls):
1754 cls._num_running_parses += 1
1755
1756
1757 @classmethod
1758 def _decrement_running_parses(cls):
1759 cls._num_running_parses -= 1
1760
1761
1762 @classmethod
1763 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001764 return (cls._num_running_parses <
1765 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001766
1767
1768 def prolog(self):
1769 super(FinalReparseTask, self).prolog()
showardd3dc1992009-04-22 21:01:40 +00001770 self._set_all_statuses(models.HostQueueEntry.Status.PARSING)
showard97aed502008-11-04 02:01:24 +00001771
1772
1773 def epilog(self):
1774 super(FinalReparseTask, self).epilog()
showardd3dc1992009-04-22 21:01:40 +00001775 self._set_all_statuses(self._final_status)
showard97aed502008-11-04 02:01:24 +00001776
1777
showardd3dc1992009-04-22 21:01:40 +00001778 def _generate_command(self, results_dir):
showard170873e2009-01-07 00:22:26 +00001779 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
showardd3dc1992009-04-22 21:01:40 +00001780 results_dir]
showard97aed502008-11-04 02:01:24 +00001781
1782
1783 def poll(self):
1784 # override poll to keep trying to start until the parse count goes down
1785 # and we can, at which point we revert to default behavior
1786 if self._parse_started:
1787 super(FinalReparseTask, self).poll()
1788 else:
1789 self._try_starting_parse()
1790
1791
1792 def run(self):
1793 # override run() to not actually run unless we can
1794 self._try_starting_parse()
1795
1796
1797 def _try_starting_parse(self):
1798 if not self._can_run_new_parse():
1799 return
showard170873e2009-01-07 00:22:26 +00001800
showard97aed502008-11-04 02:01:24 +00001801 # actually run the parse command
showardd3dc1992009-04-22 21:01:40 +00001802 super(FinalReparseTask, self).run()
showard170873e2009-01-07 00:22:26 +00001803
showard97aed502008-11-04 02:01:24 +00001804 self._increment_running_parses()
1805 self._parse_started = True
1806
1807
1808 def finished(self, success):
1809 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001810 if self._parse_started:
1811 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001812
1813
showardc9ae1782009-01-30 01:42:37 +00001814class SetEntryPendingTask(AgentTask):
1815 def __init__(self, queue_entry):
1816 super(SetEntryPendingTask, self).__init__(cmd='')
1817 self._queue_entry = queue_entry
1818 self._set_ids(queue_entries=[queue_entry])
1819
1820
1821 def run(self):
1822 agent = self._queue_entry.on_pending()
1823 if agent:
1824 self.agent.dispatcher.add_agent(agent)
1825 self.finished(True)
1826
1827
showarda3c58572009-03-12 20:36:59 +00001828class DBError(Exception):
1829 """Raised by the DBObject constructor when its select fails."""
1830
1831
mbligh36768f02008-02-22 18:28:33 +00001832class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001833 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001834
1835 # Subclasses MUST override these:
1836 _table_name = ''
1837 _fields = ()
1838
showarda3c58572009-03-12 20:36:59 +00001839 # A mapping from (type, id) to the instance of the object for that
1840 # particular id. This prevents us from creating new Job() and Host()
1841 # instances for every HostQueueEntry object that we instantiate as
1842 # multiple HQEs often share the same Job.
1843 _instances_by_type_and_id = weakref.WeakValueDictionary()
1844 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001845
showarda3c58572009-03-12 20:36:59 +00001846
1847 def __new__(cls, id=None, **kwargs):
1848 """
1849 Look to see if we already have an instance for this particular type
1850 and id. If so, use it instead of creating a duplicate instance.
1851 """
1852 if id is not None:
1853 instance = cls._instances_by_type_and_id.get((cls, id))
1854 if instance:
1855 return instance
1856 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1857
1858
1859 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001860 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001861 assert self._table_name, '_table_name must be defined in your class'
1862 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001863 if not new_record:
1864 if self._initialized and not always_query:
1865 return # We've already been initialized.
1866 if id is None:
1867 id = row[0]
1868 # Tell future constructors to use us instead of re-querying while
1869 # this instance is still around.
1870 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001871
showard6ae5ea92009-02-25 00:11:51 +00001872 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001873
jadmanski0afbb632008-06-06 21:10:57 +00001874 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001875
jadmanski0afbb632008-06-06 21:10:57 +00001876 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001877 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001878
showarda3c58572009-03-12 20:36:59 +00001879 if self._initialized:
1880 differences = self._compare_fields_in_row(row)
1881 if differences:
showard7629f142009-03-27 21:02:02 +00001882 logging.warn(
1883 'initialized %s %s instance requery is updating: %s',
1884 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001885 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001886 self._initialized = True
1887
1888
1889 @classmethod
1890 def _clear_instance_cache(cls):
1891 """Used for testing, clear the internal instance cache."""
1892 cls._instances_by_type_and_id.clear()
1893
1894
showardccbd6c52009-03-21 00:10:21 +00001895 def _fetch_row_from_db(self, row_id):
1896 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1897 rows = _db.execute(sql, (row_id,))
1898 if not rows:
showard76e29d12009-04-15 21:53:10 +00001899 raise DBError("row not found (table=%s, row id=%s)"
1900 % (self.__table, row_id))
showardccbd6c52009-03-21 00:10:21 +00001901 return rows[0]
1902
1903
showarda3c58572009-03-12 20:36:59 +00001904 def _assert_row_length(self, row):
1905 assert len(row) == len(self._fields), (
1906 "table = %s, row = %s/%d, fields = %s/%d" % (
1907 self.__table, row, len(row), self._fields, len(self._fields)))
1908
1909
1910 def _compare_fields_in_row(self, row):
1911 """
1912 Given a row as returned by a SELECT query, compare it to our existing
1913 in memory fields.
1914
1915 @param row - A sequence of values corresponding to fields named in
1916 The class attribute _fields.
1917
1918 @returns A dictionary listing the differences keyed by field name
1919 containing tuples of (current_value, row_value).
1920 """
1921 self._assert_row_length(row)
1922 differences = {}
1923 for field, row_value in itertools.izip(self._fields, row):
1924 current_value = getattr(self, field)
1925 if current_value != row_value:
1926 differences[field] = (current_value, row_value)
1927 return differences
showard2bab8f42008-11-12 18:15:22 +00001928
1929
1930 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001931 """
1932 Update our field attributes using a single row returned by SELECT.
1933
1934 @param row - A sequence of values corresponding to fields named in
1935 the class fields list.
1936 """
1937 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001938
showard2bab8f42008-11-12 18:15:22 +00001939 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001940 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001941 setattr(self, field, value)
1942 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001943
showard2bab8f42008-11-12 18:15:22 +00001944 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001945
mblighe2586682008-02-29 22:45:46 +00001946
showardccbd6c52009-03-21 00:10:21 +00001947 def update_from_database(self):
1948 assert self.id is not None
1949 row = self._fetch_row_from_db(self.id)
1950 self._update_fields_from_row(row)
1951
1952
jadmanski0afbb632008-06-06 21:10:57 +00001953 def count(self, where, table = None):
1954 if not table:
1955 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001956
jadmanski0afbb632008-06-06 21:10:57 +00001957 rows = _db.execute("""
1958 SELECT count(*) FROM %s
1959 WHERE %s
1960 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001961
jadmanski0afbb632008-06-06 21:10:57 +00001962 assert len(rows) == 1
1963
1964 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001965
1966
showardd3dc1992009-04-22 21:01:40 +00001967 def update_field(self, field, value):
showard2bab8f42008-11-12 18:15:22 +00001968 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001969
showard2bab8f42008-11-12 18:15:22 +00001970 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001971 return
mbligh36768f02008-02-22 18:28:33 +00001972
mblighf8c624d2008-07-03 16:58:45 +00001973 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
jadmanski0afbb632008-06-06 21:10:57 +00001974 _db.execute(query, (value, self.id))
1975
showard2bab8f42008-11-12 18:15:22 +00001976 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001977
1978
jadmanski0afbb632008-06-06 21:10:57 +00001979 def save(self):
1980 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001981 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001982 columns = ','.join([str(key) for key in keys])
showard76e29d12009-04-15 21:53:10 +00001983 values = []
1984 for key in keys:
1985 value = getattr(self, key)
1986 if value is None:
1987 values.append('NULL')
1988 else:
1989 values.append('"%s"' % value)
showard89f84db2009-03-12 20:39:13 +00001990 values_str = ','.join(values)
1991 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1992 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001993 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001994 # Update our id to the one the database just assigned to us.
1995 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001996
1997
jadmanski0afbb632008-06-06 21:10:57 +00001998 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001999 self._instances_by_type_and_id.pop((type(self), id), None)
2000 self._initialized = False
2001 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00002002 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
2003 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00002004
2005
showard63a34772008-08-18 19:32:50 +00002006 @staticmethod
2007 def _prefix_with(string, prefix):
2008 if string:
2009 string = prefix + string
2010 return string
2011
2012
jadmanski0afbb632008-06-06 21:10:57 +00002013 @classmethod
showard989f25d2008-10-01 11:38:11 +00002014 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00002015 """
2016 Construct instances of our class based on the given database query.
2017
2018 @yields One class instance for each row fetched.
2019 """
showard63a34772008-08-18 19:32:50 +00002020 order_by = cls._prefix_with(order_by, 'ORDER BY ')
2021 where = cls._prefix_with(where, 'WHERE ')
2022 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00002023 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00002024 'joins' : joins,
2025 'where' : where,
2026 'order_by' : order_by})
2027 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00002028 for row in rows:
2029 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00002030
mbligh36768f02008-02-22 18:28:33 +00002031
2032class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002033 _table_name = 'ineligible_host_queues'
2034 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00002035
2036
showard89f84db2009-03-12 20:39:13 +00002037class AtomicGroup(DBObject):
2038 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00002039 _fields = ('id', 'name', 'description', 'max_number_of_machines',
2040 'invalid')
showard89f84db2009-03-12 20:39:13 +00002041
2042
showard989f25d2008-10-01 11:38:11 +00002043class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002044 _table_name = 'labels'
2045 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00002046 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00002047
2048
mbligh36768f02008-02-22 18:28:33 +00002049class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002050 _table_name = 'hosts'
2051 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
2052 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
2053
2054
jadmanski0afbb632008-06-06 21:10:57 +00002055 def current_task(self):
2056 rows = _db.execute("""
2057 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2058 """, (self.id,))
2059
2060 if len(rows) == 0:
2061 return None
2062 else:
2063 assert len(rows) == 1
2064 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002065 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002066
2067
jadmanski0afbb632008-06-06 21:10:57 +00002068 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002069 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002070 if self.current_task():
2071 self.current_task().requeue()
2072
showard6ae5ea92009-02-25 00:11:51 +00002073
jadmanski0afbb632008-06-06 21:10:57 +00002074 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002075 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002076 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002077
2078
showard170873e2009-01-07 00:22:26 +00002079 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002080 """
showard170873e2009-01-07 00:22:26 +00002081 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002082 """
2083 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002084 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002085 FROM labels
2086 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002087 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002088 ORDER BY labels.name
2089 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002090 platform = None
2091 all_labels = []
2092 for label_name, is_platform in rows:
2093 if is_platform:
2094 platform = label_name
2095 all_labels.append(label_name)
2096 return platform, all_labels
2097
2098
2099 def reverify_tasks(self):
2100 cleanup_task = CleanupTask(host=self)
2101 verify_task = VerifyTask(host=self)
2102 # just to make sure this host does not get taken away
2103 self.set_status('Cleaning')
2104 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002105
2106
mbligh36768f02008-02-22 18:28:33 +00002107class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002108 _table_name = 'host_queue_entries'
2109 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002110 'active', 'complete', 'deleted', 'execution_subdir',
showardd3dc1992009-04-22 21:01:40 +00002111 'atomic_group_id', 'aborted')
showard6ae5ea92009-02-25 00:11:51 +00002112
2113
showarda3c58572009-03-12 20:36:59 +00002114 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002115 assert id or row
showarda3c58572009-03-12 20:36:59 +00002116 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002117 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002118
jadmanski0afbb632008-06-06 21:10:57 +00002119 if self.host_id:
2120 self.host = Host(self.host_id)
2121 else:
2122 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002123
showard170873e2009-01-07 00:22:26 +00002124 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002125 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002126
2127
showard89f84db2009-03-12 20:39:13 +00002128 @classmethod
2129 def clone(cls, template):
2130 """
2131 Creates a new row using the values from a template instance.
2132
2133 The new instance will not exist in the database or have a valid
2134 id attribute until its save() method is called.
2135 """
2136 assert isinstance(template, cls)
2137 new_row = [getattr(template, field) for field in cls._fields]
2138 clone = cls(row=new_row, new_record=True)
2139 clone.id = None
2140 return clone
2141
2142
showardc85c21b2008-11-24 22:17:37 +00002143 def _view_job_url(self):
2144 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2145
2146
jadmanski0afbb632008-06-06 21:10:57 +00002147 def set_host(self, host):
2148 if host:
2149 self.queue_log_record('Assigning host ' + host.hostname)
2150 self.update_field('host_id', host.id)
2151 self.update_field('active', True)
2152 self.block_host(host.id)
2153 else:
2154 self.queue_log_record('Releasing host')
2155 self.unblock_host(self.host.id)
2156 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002157
jadmanski0afbb632008-06-06 21:10:57 +00002158 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002159
2160
jadmanski0afbb632008-06-06 21:10:57 +00002161 def get_host(self):
2162 return self.host
mbligh36768f02008-02-22 18:28:33 +00002163
2164
jadmanski0afbb632008-06-06 21:10:57 +00002165 def queue_log_record(self, log_line):
2166 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002167 _drone_manager.write_lines_to_file(self.queue_log_path,
2168 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002169
2170
jadmanski0afbb632008-06-06 21:10:57 +00002171 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002172 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002173 row = [0, self.job.id, host_id]
2174 block = IneligibleHostQueue(row=row, new_record=True)
2175 block.save()
mblighe2586682008-02-29 22:45:46 +00002176
2177
jadmanski0afbb632008-06-06 21:10:57 +00002178 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002179 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002180 blocks = IneligibleHostQueue.fetch(
2181 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2182 for block in blocks:
2183 block.delete()
mblighe2586682008-02-29 22:45:46 +00002184
2185
showard2bab8f42008-11-12 18:15:22 +00002186 def set_execution_subdir(self, subdir=None):
2187 if subdir is None:
2188 assert self.get_host()
2189 subdir = self.get_host().hostname
2190 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002191
2192
showard6355f6b2008-12-05 18:52:13 +00002193 def _get_hostname(self):
2194 if self.host:
2195 return self.host.hostname
2196 return 'no host'
2197
2198
showard170873e2009-01-07 00:22:26 +00002199 def __str__(self):
2200 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2201
2202
jadmanski0afbb632008-06-06 21:10:57 +00002203 def set_status(self, status):
showardd3dc1992009-04-22 21:01:40 +00002204 self.update_field('status', status)
mblighf8c624d2008-07-03 16:58:45 +00002205
showardb18134f2009-03-20 20:52:18 +00002206 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002207
showardc85c21b2008-11-24 22:17:37 +00002208 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002209 self.update_field('complete', False)
2210 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002211
jadmanski0afbb632008-06-06 21:10:57 +00002212 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showardd3dc1992009-04-22 21:01:40 +00002213 'Gathering']:
jadmanski0afbb632008-06-06 21:10:57 +00002214 self.update_field('complete', False)
2215 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002216
showardc85c21b2008-11-24 22:17:37 +00002217 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002218 self.update_field('complete', True)
2219 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002220
2221 should_email_status = (status.lower() in _notify_email_statuses or
2222 'all' in _notify_email_statuses)
2223 if should_email_status:
2224 self._email_on_status(status)
2225
2226 self._email_on_job_complete()
2227
2228
2229 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002230 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002231
2232 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2233 self.job.id, self.job.name, hostname, status)
2234 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2235 self.job.id, self.job.name, hostname, status,
2236 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002237 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002238
2239
2240 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002241 if not self.job.is_finished():
2242 return
showard542e8402008-09-19 20:16:18 +00002243
showardc85c21b2008-11-24 22:17:37 +00002244 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002245 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002246 for queue_entry in hosts_queue:
2247 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002248 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002249 queue_entry.status))
2250
2251 summary_text = "\n".join(summary_text)
2252 status_counts = models.Job.objects.get_status_counts(
2253 [self.job.id])[self.job.id]
2254 status = ', '.join('%d %s' % (count, status) for status, count
2255 in status_counts.iteritems())
2256
2257 subject = 'Autotest: Job ID: %s "%s" %s' % (
2258 self.job.id, self.job.name, status)
2259 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2260 self.job.id, self.job.name, status, self._view_job_url(),
2261 summary_text)
showard170873e2009-01-07 00:22:26 +00002262 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002263
2264
showard89f84db2009-03-12 20:39:13 +00002265 def run(self, assigned_host=None):
2266 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002267 assert assigned_host
2268 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002269 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002270
showardb18134f2009-03-20 20:52:18 +00002271 logging.info("%s/%s/%s scheduled on %s, status=%s",
2272 self.job.name, self.meta_host, self.atomic_group_id,
2273 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002274
jadmanski0afbb632008-06-06 21:10:57 +00002275 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002276
showard6ae5ea92009-02-25 00:11:51 +00002277
jadmanski0afbb632008-06-06 21:10:57 +00002278 def requeue(self):
2279 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002280 # verify/cleanup failure sets the execution subdir, so reset it here
2281 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002282 if self.meta_host:
2283 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002284
2285
jadmanski0afbb632008-06-06 21:10:57 +00002286 def handle_host_failure(self):
2287 """\
2288 Called when this queue entry's host has failed verification and
2289 repair.
2290 """
2291 assert not self.meta_host
2292 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002293 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002294
2295
jadmanskif7fa2cc2008-10-01 14:13:23 +00002296 @property
2297 def aborted_by(self):
2298 self._load_abort_info()
2299 return self._aborted_by
2300
2301
2302 @property
2303 def aborted_on(self):
2304 self._load_abort_info()
2305 return self._aborted_on
2306
2307
2308 def _load_abort_info(self):
2309 """ Fetch info about who aborted the job. """
2310 if hasattr(self, "_aborted_by"):
2311 return
2312 rows = _db.execute("""
2313 SELECT users.login, aborted_host_queue_entries.aborted_on
2314 FROM aborted_host_queue_entries
2315 INNER JOIN users
2316 ON users.id = aborted_host_queue_entries.aborted_by_id
2317 WHERE aborted_host_queue_entries.queue_entry_id = %s
2318 """, (self.id,))
2319 if rows:
2320 self._aborted_by, self._aborted_on = rows[0]
2321 else:
2322 self._aborted_by = self._aborted_on = None
2323
2324
showardb2e2c322008-10-14 17:33:55 +00002325 def on_pending(self):
2326 """
2327 Called when an entry in a synchronous job has passed verify. If the
2328 job is ready to run, returns an agent to run the job. Returns None
2329 otherwise.
2330 """
2331 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002332 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002333 if self.job.is_ready():
2334 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002335 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002336 return None
2337
2338
showardd3dc1992009-04-22 21:01:40 +00002339 def abort(self, dispatcher):
2340 assert self.aborted and not self.complete
showard1be97432008-10-17 15:30:45 +00002341
showardd3dc1992009-04-22 21:01:40 +00002342 Status = models.HostQueueEntry.Status
2343 has_running_job_agent = (
2344 self.status in (Status.RUNNING, Status.GATHERING, Status.PARSING)
2345 and dispatcher.get_agents_for_entry(self))
2346 if has_running_job_agent:
2347 # do nothing; post-job tasks will finish and then mark this entry
2348 # with status "Aborted" and take care of the host
2349 return
2350
2351 if self.status in (Status.STARTING, Status.PENDING):
2352 self.host.set_status(models.Host.Status.READY)
2353 elif self.status == Status.VERIFYING:
2354 dispatcher.add_agent(Agent(tasks=self.host.reverify_tasks()))
2355
2356 self.set_status(Status.ABORTED)
showard170873e2009-01-07 00:22:26 +00002357
2358 def execution_tag(self):
2359 assert self.execution_subdir
2360 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002361
2362
mbligh36768f02008-02-22 18:28:33 +00002363class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002364 _table_name = 'jobs'
2365 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2366 'control_type', 'created_on', 'synch_count', 'timeout',
2367 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2368
2369
showarda3c58572009-03-12 20:36:59 +00002370 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002371 assert id or row
showarda3c58572009-03-12 20:36:59 +00002372 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002373
mblighe2586682008-02-29 22:45:46 +00002374
jadmanski0afbb632008-06-06 21:10:57 +00002375 def is_server_job(self):
2376 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002377
2378
showard170873e2009-01-07 00:22:26 +00002379 def tag(self):
2380 return "%s-%s" % (self.id, self.owner)
2381
2382
jadmanski0afbb632008-06-06 21:10:57 +00002383 def get_host_queue_entries(self):
2384 rows = _db.execute("""
2385 SELECT * FROM host_queue_entries
2386 WHERE job_id= %s
2387 """, (self.id,))
2388 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002389
jadmanski0afbb632008-06-06 21:10:57 +00002390 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002391
jadmanski0afbb632008-06-06 21:10:57 +00002392 return entries
mbligh36768f02008-02-22 18:28:33 +00002393
2394
jadmanski0afbb632008-06-06 21:10:57 +00002395 def set_status(self, status, update_queues=False):
2396 self.update_field('status',status)
2397
2398 if update_queues:
2399 for queue_entry in self.get_host_queue_entries():
2400 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002401
2402
jadmanski0afbb632008-06-06 21:10:57 +00002403 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002404 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2405 status='Pending')
2406 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002407
2408
jadmanski0afbb632008-06-06 21:10:57 +00002409 def num_machines(self, clause = None):
2410 sql = "job_id=%s" % self.id
2411 if clause:
2412 sql += " AND (%s)" % clause
2413 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002414
2415
jadmanski0afbb632008-06-06 21:10:57 +00002416 def num_queued(self):
2417 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002418
2419
jadmanski0afbb632008-06-06 21:10:57 +00002420 def num_active(self):
2421 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002422
2423
jadmanski0afbb632008-06-06 21:10:57 +00002424 def num_complete(self):
2425 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002426
2427
jadmanski0afbb632008-06-06 21:10:57 +00002428 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002429 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002430
mbligh36768f02008-02-22 18:28:33 +00002431
showard6bb7c292009-01-30 01:44:51 +00002432 def _not_yet_run_entries(self, include_verifying=True):
2433 statuses = [models.HostQueueEntry.Status.QUEUED,
2434 models.HostQueueEntry.Status.PENDING]
2435 if include_verifying:
2436 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2437 return models.HostQueueEntry.objects.filter(job=self.id,
2438 status__in=statuses)
2439
2440
2441 def _stop_all_entries(self):
2442 entries_to_stop = self._not_yet_run_entries(
2443 include_verifying=False)
2444 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002445 assert not child_entry.complete, (
2446 '%s status=%s, active=%s, complete=%s' %
2447 (child_entry.id, child_entry.status, child_entry.active,
2448 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002449 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2450 child_entry.host.status = models.Host.Status.READY
2451 child_entry.host.save()
2452 child_entry.status = models.HostQueueEntry.Status.STOPPED
2453 child_entry.save()
2454
showard2bab8f42008-11-12 18:15:22 +00002455 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002456 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002457 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002458 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002459
2460
jadmanski0afbb632008-06-06 21:10:57 +00002461 def write_to_machines_file(self, queue_entry):
2462 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002463 file_path = os.path.join(self.tag(), '.machines')
2464 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002465
2466
showard2bab8f42008-11-12 18:15:22 +00002467 def _next_group_name(self):
2468 query = models.HostQueueEntry.objects.filter(
2469 job=self.id).values('execution_subdir').distinct()
2470 subdirs = (entry['execution_subdir'] for entry in query)
2471 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2472 ids = [int(match.group(1)) for match in groups if match]
2473 if ids:
2474 next_id = max(ids) + 1
2475 else:
2476 next_id = 0
2477 return "group%d" % next_id
2478
2479
showard170873e2009-01-07 00:22:26 +00002480 def _write_control_file(self, execution_tag):
2481 control_path = _drone_manager.attach_file_to_execution(
2482 execution_tag, self.control_file)
2483 return control_path
mbligh36768f02008-02-22 18:28:33 +00002484
showardb2e2c322008-10-14 17:33:55 +00002485
showard2bab8f42008-11-12 18:15:22 +00002486 def get_group_entries(self, queue_entry_from_group):
2487 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002488 return list(HostQueueEntry.fetch(
2489 where='job_id=%s AND execution_subdir=%s',
2490 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002491
2492
showardb2e2c322008-10-14 17:33:55 +00002493 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002494 assert queue_entries
2495 execution_tag = queue_entries[0].execution_tag()
2496 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002497 hostnames = ','.join([entry.get_host().hostname
2498 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002499
showard87ba02a2009-04-20 19:37:32 +00002500 params = _autoserv_command_line(
2501 hostnames, execution_tag,
2502 ['-P', execution_tag, '-n',
2503 _drone_manager.absolute_path(control_path)],
2504 job=self)
mbligh36768f02008-02-22 18:28:33 +00002505
jadmanski0afbb632008-06-06 21:10:57 +00002506 if not self.is_server_job():
2507 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002508
showardb2e2c322008-10-14 17:33:55 +00002509 return params
mblighe2586682008-02-29 22:45:46 +00002510
mbligh36768f02008-02-22 18:28:33 +00002511
showardc9ae1782009-01-30 01:42:37 +00002512 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002513 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002514 return True
showard0fc38302008-10-23 00:44:07 +00002515 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002516 return queue_entry.get_host().dirty
2517 return False
showard21baa452008-10-21 00:08:39 +00002518
showardc9ae1782009-01-30 01:42:37 +00002519
2520 def _should_run_verify(self, queue_entry):
2521 do_not_verify = (queue_entry.host.protection ==
2522 host_protections.Protection.DO_NOT_VERIFY)
2523 if do_not_verify:
2524 return False
2525 return self.run_verify
2526
2527
2528 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002529 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002530 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002531 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002532 if self._should_run_verify(queue_entry):
2533 tasks.append(VerifyTask(queue_entry=queue_entry))
2534 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002535 return tasks
2536
2537
showard2bab8f42008-11-12 18:15:22 +00002538 def _assign_new_group(self, queue_entries):
2539 if len(queue_entries) == 1:
2540 group_name = queue_entries[0].get_host().hostname
2541 else:
2542 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002543 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002544 self.id, [entry.host.hostname for entry in queue_entries],
2545 group_name)
2546
2547 for queue_entry in queue_entries:
2548 queue_entry.set_execution_subdir(group_name)
2549
2550
2551 def _choose_group_to_run(self, include_queue_entry):
2552 chosen_entries = [include_queue_entry]
2553
2554 num_entries_needed = self.synch_count - 1
2555 if num_entries_needed > 0:
2556 pending_entries = HostQueueEntry.fetch(
2557 where='job_id = %s AND status = "Pending" AND id != %s',
2558 params=(self.id, include_queue_entry.id))
2559 chosen_entries += list(pending_entries)[:num_entries_needed]
2560
2561 self._assign_new_group(chosen_entries)
2562 return chosen_entries
2563
2564
2565 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002566 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002567 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2568 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002569
showard2bab8f42008-11-12 18:15:22 +00002570 queue_entries = self._choose_group_to_run(queue_entry)
2571 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002572
2573
2574 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002575 for queue_entry in queue_entries:
2576 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002577 params = self._get_autoserv_params(queue_entries)
2578 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2579 cmd=params)
2580 tasks = initial_tasks + [queue_task]
2581 entry_ids = [entry.id for entry in queue_entries]
2582
showard170873e2009-01-07 00:22:26 +00002583 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002584
2585
mbligh36768f02008-02-22 18:28:33 +00002586if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002587 main()