blob: f62bc7c9716809ccc85bc6716232ab86d1d64ece [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
57os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
58# Here we export the log name, using the same convention as autoserv's results
59# directory.
60scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
61os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
62logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
63
mbligh36768f02008-02-22 18:28:33 +000064
65def main():
jadmanski0afbb632008-06-06 21:10:57 +000066 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 parser = optparse.OptionParser(usage)
69 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
70 action='store_true')
71 parser.add_option('--logfile', help='Set a log file that all stdout ' +
72 'should be redirected to. Stderr will go to this ' +
73 'file + ".err"')
74 parser.add_option('--test', help='Indicate that scheduler is under ' +
75 'test and should use dummy autoserv and no parsing',
76 action='store_true')
77 (options, args) = parser.parse_args()
78 if len(args) != 1:
79 parser.print_usage()
80 return
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 global RESULTS_DIR
83 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000084
showardcca334f2009-03-12 20:38:34 +000085 # Change the cwd while running to avoid issues incase we were launched from
86 # somewhere odd (such as a random NFS home directory of the person running
87 # sudo to launch us as the appropriate user).
88 os.chdir(RESULTS_DIR)
89
jadmanski0afbb632008-06-06 21:10:57 +000090 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000091 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
92 "notify_email_statuses",
93 default='')
showardc85c21b2008-11-24 22:17:37 +000094 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000095 _notify_email_statuses = [status for status in
96 re.split(r'[\s,;:]', notify_statuses_list.lower())
97 if status]
showardc85c21b2008-11-24 22:17:37 +000098
jadmanski0afbb632008-06-06 21:10:57 +000099 if options.test:
100 global _autoserv_path
101 _autoserv_path = 'autoserv_dummy'
102 global _testing_mode
103 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh37eceaa2008-12-15 22:56:37 +0000105 # AUTOTEST_WEB.base_url is still a supported config option as some people
106 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000107 global _base_url
showard170873e2009-01-07 00:22:26 +0000108 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
109 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000110 if config_base_url:
111 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000112 else:
mbligh37eceaa2008-12-15 22:56:37 +0000113 # For the common case of everything running on a single server you
114 # can just set the hostname in a single place in the config file.
115 server_name = c.get_config_value('SERVER', 'hostname')
116 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000117 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000118 sys.exit(1)
119 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000120
showardc5afc462009-01-13 00:09:39 +0000121 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000122 server.start()
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 try:
showardc5afc462009-01-13 00:09:39 +0000125 init(options.logfile)
126 dispatcher = Dispatcher()
127 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 while not _shutdown:
130 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000131 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000132 except:
showard170873e2009-01-07 00:22:26 +0000133 email_manager.manager.log_stacktrace(
134 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000135
showard170873e2009-01-07 00:22:26 +0000136 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000137 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000138 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000139 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000140
141
142def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000143 global _shutdown
144 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000145 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000146
147
148def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000149 if logfile:
150 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000151 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
152 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000153
showardb1e51872008-10-07 11:08:18 +0000154 if _testing_mode:
155 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000156 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
159 global _db
showard170873e2009-01-07 00:22:26 +0000160 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000161 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000162
showardfa8629c2008-11-04 16:51:23 +0000163 # ensure Django connection is in autocommit
164 setup_django_environment.enable_autocommit()
165
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000167 signal.signal(signal.SIGINT, handle_sigint)
168
showardd1ee1dd2009-01-07 21:33:08 +0000169 drones = global_config.global_config.get_config_value(
170 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
171 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000172 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000173 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000174 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
175
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000177
178
179def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000180 out_file = logfile
181 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000183 out_fd = open(out_file, "a", buffering=0)
184 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000185
jadmanski0afbb632008-06-06 21:10:57 +0000186 os.dup2(out_fd.fileno(), sys.stdout.fileno())
187 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000188
jadmanski0afbb632008-06-06 21:10:57 +0000189 sys.stdout = out_fd
190 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000191
192
mblighd5c95802008-03-05 00:33:46 +0000193def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000194 rows = _db.execute("""
195 SELECT * FROM host_queue_entries WHERE status='Abort';
196 """)
showard2bab8f42008-11-12 18:15:22 +0000197
jadmanski0afbb632008-06-06 21:10:57 +0000198 qe = [HostQueueEntry(row=i) for i in rows]
199 return qe
mbligh36768f02008-02-22 18:28:33 +0000200
showard7cf9a9b2008-05-15 21:15:52 +0000201
showard89f84db2009-03-12 20:39:13 +0000202class SchedulerError(Exception):
203 """Raised by HostScheduler when an inconsistent state occurs."""
204
205
showard63a34772008-08-18 19:32:50 +0000206class HostScheduler(object):
207 def _get_ready_hosts(self):
208 # avoid any host with a currently active queue entry against it
209 hosts = Host.fetch(
210 joins='LEFT JOIN host_queue_entries AS active_hqe '
211 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000212 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000213 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000214 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000215 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
216 return dict((host.id, host) for host in hosts)
217
218
219 @staticmethod
220 def _get_sql_id_list(id_list):
221 return ','.join(str(item_id) for item_id in id_list)
222
223
224 @classmethod
showard989f25d2008-10-01 11:38:11 +0000225 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000226 if not id_list:
227 return {}
showard63a34772008-08-18 19:32:50 +0000228 query %= cls._get_sql_id_list(id_list)
229 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000230 return cls._process_many2many_dict(rows, flip)
231
232
233 @staticmethod
234 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000235 result = {}
236 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000237 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000238 if flip:
239 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000240 result.setdefault(left_id, set()).add(right_id)
241 return result
242
243
244 @classmethod
245 def _get_job_acl_groups(cls, job_ids):
246 query = """
showardd9ac4452009-02-07 02:04:37 +0000247 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000248 FROM jobs
249 INNER JOIN users ON users.login = jobs.owner
250 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
251 WHERE jobs.id IN (%s)
252 """
253 return cls._get_many2many_dict(query, job_ids)
254
255
256 @classmethod
257 def _get_job_ineligible_hosts(cls, job_ids):
258 query = """
259 SELECT job_id, host_id
260 FROM ineligible_host_queues
261 WHERE job_id IN (%s)
262 """
263 return cls._get_many2many_dict(query, job_ids)
264
265
266 @classmethod
showard989f25d2008-10-01 11:38:11 +0000267 def _get_job_dependencies(cls, job_ids):
268 query = """
269 SELECT job_id, label_id
270 FROM jobs_dependency_labels
271 WHERE job_id IN (%s)
272 """
273 return cls._get_many2many_dict(query, job_ids)
274
275
276 @classmethod
showard63a34772008-08-18 19:32:50 +0000277 def _get_host_acls(cls, host_ids):
278 query = """
showardd9ac4452009-02-07 02:04:37 +0000279 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000280 FROM acl_groups_hosts
281 WHERE host_id IN (%s)
282 """
283 return cls._get_many2many_dict(query, host_ids)
284
285
286 @classmethod
287 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000288 if not host_ids:
289 return {}, {}
showard63a34772008-08-18 19:32:50 +0000290 query = """
291 SELECT label_id, host_id
292 FROM hosts_labels
293 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000294 """ % cls._get_sql_id_list(host_ids)
295 rows = _db.execute(query)
296 labels_to_hosts = cls._process_many2many_dict(rows)
297 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
298 return labels_to_hosts, hosts_to_labels
299
300
301 @classmethod
302 def _get_labels(cls):
303 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000304
305
306 def refresh(self, pending_queue_entries):
307 self._hosts_available = self._get_ready_hosts()
308
309 relevant_jobs = [queue_entry.job_id
310 for queue_entry in pending_queue_entries]
311 self._job_acls = self._get_job_acl_groups(relevant_jobs)
312 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000313 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000314
315 host_ids = self._hosts_available.keys()
316 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000317 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
318
319 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000320
321
322 def _is_acl_accessible(self, host_id, queue_entry):
323 job_acls = self._job_acls.get(queue_entry.job_id, set())
324 host_acls = self._host_acls.get(host_id, set())
325 return len(host_acls.intersection(job_acls)) > 0
326
327
showard989f25d2008-10-01 11:38:11 +0000328 def _check_job_dependencies(self, job_dependencies, host_labels):
329 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000330 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000331
332
333 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
334 queue_entry):
showardade14e22009-01-26 22:38:32 +0000335 if not queue_entry.meta_host:
336 # bypass only_if_needed labels when a specific host is selected
337 return True
338
showard989f25d2008-10-01 11:38:11 +0000339 for label_id in host_labels:
340 label = self._labels[label_id]
341 if not label.only_if_needed:
342 # we don't care about non-only_if_needed labels
343 continue
344 if queue_entry.meta_host == label_id:
345 # if the label was requested in a metahost it's OK
346 continue
347 if label_id not in job_dependencies:
348 return False
349 return True
350
351
showard89f84db2009-03-12 20:39:13 +0000352 def _check_atomic_group_labels(self, host_labels, queue_entry):
353 """
354 Determine if the given HostQueueEntry's atomic group settings are okay
355 to schedule on a host with the given labels.
356
357 @param host_labels - A list of label ids that the host has.
358 @param queue_entry - The HostQueueEntry being considered for the host.
359
360 @returns True if atomic group settings are okay, False otherwise.
361 """
362 return (self._get_host_atomic_group_id(host_labels) ==
363 queue_entry.atomic_group_id)
364
365
366 def _get_host_atomic_group_id(self, host_labels):
367 """
368 Return the atomic group label id for a host with the given set of
369 labels if any, or None otherwise. Raises an exception if more than
370 one atomic group are found in the set of labels.
371
372 @param host_labels - A list of label ids that the host has.
373
374 @returns The id of the atomic group found on a label in host_labels
375 or None if no atomic group label is found.
376 @raises SchedulerError - If more than one atomic group label is found.
377 """
378 atomic_ids = [self._labels[label_id].atomic_group_id
379 for label_id in host_labels
380 if self._labels[label_id].atomic_group_id is not None]
381 if not atomic_ids:
382 return None
383 if len(atomic_ids) > 1:
384 raise SchedulerError('More than one atomic label on host.')
385 return atomic_ids[0]
386
387
388 def _get_atomic_group_labels(self, atomic_group_id):
389 """
390 Lookup the label ids that an atomic_group is associated with.
391
392 @param atomic_group_id - The id of the AtomicGroup to look up.
393
394 @returns A generator yeilding Label ids for this atomic group.
395 """
396 return (id for id, label in self._labels.iteritems()
397 if label.atomic_group_id == atomic_group_id
398 and not label.invalid)
399
400
401 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
402 """
403 @param group_hosts - A sequence of Host ids to test for usability
404 and eligibility against the Job associated with queue_entry.
405 @param queue_entry - The HostQueueEntry that these hosts are being
406 tested for eligibility against.
407
408 @returns A subset of group_hosts Host ids that are eligible for the
409 supplied queue_entry.
410 """
411 return set(host_id for host_id in group_hosts
412 if self._is_host_usable(host_id)
413 and self._is_host_eligible_for_job(host_id, queue_entry))
414
415
showard989f25d2008-10-01 11:38:11 +0000416 def _is_host_eligible_for_job(self, host_id, queue_entry):
417 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
418 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000419
showard89f84db2009-03-12 20:39:13 +0000420 return (self._is_acl_accessible(host_id, queue_entry) and
421 self._check_job_dependencies(job_dependencies, host_labels) and
422 self._check_only_if_needed_labels(
423 job_dependencies, host_labels, queue_entry) and
424 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000425
426
showard63a34772008-08-18 19:32:50 +0000427 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000428 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000429 return None
430 return self._hosts_available.pop(queue_entry.host_id, None)
431
432
433 def _is_host_usable(self, host_id):
434 if host_id not in self._hosts_available:
435 # host was already used during this scheduling cycle
436 return False
437 if self._hosts_available[host_id].invalid:
438 # Invalid hosts cannot be used for metahosts. They're included in
439 # the original query because they can be used by non-metahosts.
440 return False
441 return True
442
443
444 def _schedule_metahost(self, queue_entry):
445 label_id = queue_entry.meta_host
446 hosts_in_label = self._label_hosts.get(label_id, set())
447 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
448 set())
449
450 # must iterate over a copy so we can mutate the original while iterating
451 for host_id in list(hosts_in_label):
452 if not self._is_host_usable(host_id):
453 hosts_in_label.remove(host_id)
454 continue
455 if host_id in ineligible_host_ids:
456 continue
showard989f25d2008-10-01 11:38:11 +0000457 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000458 continue
459
showard89f84db2009-03-12 20:39:13 +0000460 # Remove the host from our cached internal state before returning
461 # the host object.
showard63a34772008-08-18 19:32:50 +0000462 hosts_in_label.remove(host_id)
463 return self._hosts_available.pop(host_id)
464 return None
465
466
467 def find_eligible_host(self, queue_entry):
468 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000469 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000470 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000471 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000472 return self._schedule_metahost(queue_entry)
473
474
showard89f84db2009-03-12 20:39:13 +0000475 def find_eligible_atomic_group(self, queue_entry):
476 """
477 Given an atomic group host queue entry, locate an appropriate group
478 of hosts for the associated job to run on.
479
480 The caller is responsible for creating new HQEs for the additional
481 hosts returned in order to run the actual job on them.
482
483 @returns A list of Host instances in a ready state to satisfy this
484 atomic group scheduling. Hosts will all belong to the same
485 atomic group label as specified by the queue_entry.
486 An empty list will be returned if no suitable atomic
487 group could be found.
488
489 TODO(gps): what is responsible for kicking off any attempted repairs on
490 a group of hosts? not this function, but something needs to. We do
491 not communicate that reason for returning [] outside of here...
492 For now, we'll just be unschedulable if enough hosts within one group
493 enter Repair Failed state.
494 """
495 assert queue_entry.atomic_group_id is not None
496 job = queue_entry.job
497 assert job.synch_count and job.synch_count > 0
498 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
499 if job.synch_count > atomic_group.max_number_of_machines:
500 # Such a Job and HostQueueEntry should never be possible to
501 # create using the frontend. Regardless, we can't process it.
502 # Abort it immediately and log an error on the scheduler.
503 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
showard7629f142009-03-27 21:02:02 +0000504 logging.error(
505 'Error: job %d synch_count=%d > requested atomic_group %d '
506 'max_number_of_machines=%d. Aborted host_queue_entry %d.',
507 job.id, job.synch_count, atomic_group.id,
508 atomic_group.max_number_of_machines, queue_entry.id)
showard89f84db2009-03-12 20:39:13 +0000509 return []
510 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
511 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
512 set())
513
514 # Look in each label associated with atomic_group until we find one with
515 # enough hosts to satisfy the job.
516 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
517 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
518 if queue_entry.meta_host is not None:
519 # If we have a metahost label, only allow its hosts.
520 group_hosts.intersection_update(hosts_in_label)
521 group_hosts -= ineligible_host_ids
522 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
523 group_hosts, queue_entry)
524
525 # Job.synch_count is treated as "minimum synch count" when
526 # scheduling for an atomic group of hosts. The atomic group
527 # number of machines is the maximum to pick out of a single
528 # atomic group label for scheduling at one time.
529 min_hosts = job.synch_count
530 max_hosts = atomic_group.max_number_of_machines
531
532 if len(eligible_hosts_in_group) < min_hosts:
533 # Not enough eligible hosts in this atomic group label.
534 continue
535
536 # Limit ourselves to scheduling the atomic group size.
537 if len(eligible_hosts_in_group) > max_hosts:
538 eligible_hosts_in_group = random.sample(
539 eligible_hosts_in_group, max_hosts)
540
541 # Remove the selected hosts from our cached internal state
542 # of available hosts in order to return the Host objects.
543 host_list = []
544 for host_id in eligible_hosts_in_group:
545 hosts_in_label.discard(host_id)
546 host_list.append(self._hosts_available.pop(host_id))
547 return host_list
548
549 return []
550
551
showard170873e2009-01-07 00:22:26 +0000552class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000553 def __init__(self):
554 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000555 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000556 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000557 self._host_agents = {}
558 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000559
mbligh36768f02008-02-22 18:28:33 +0000560
jadmanski0afbb632008-06-06 21:10:57 +0000561 def do_initial_recovery(self, recover_hosts=True):
562 # always recover processes
563 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000564
jadmanski0afbb632008-06-06 21:10:57 +0000565 if recover_hosts:
566 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000567
568
jadmanski0afbb632008-06-06 21:10:57 +0000569 def tick(self):
showard170873e2009-01-07 00:22:26 +0000570 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000571 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000572 self._find_aborting()
573 self._schedule_new_jobs()
574 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000575 _drone_manager.execute_actions()
576 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000577
showard97aed502008-11-04 02:01:24 +0000578
showarda3ab0d52008-11-03 19:03:47 +0000579 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000580 should_cleanup = (self._last_clean_time +
581 scheduler_config.config.clean_interval * 60 <
582 time.time())
583 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000584 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000585 self._abort_timed_out_jobs()
586 self._abort_jobs_past_synch_start_timeout()
587 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000588 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000589 self._last_clean_time = time.time()
590
mbligh36768f02008-02-22 18:28:33 +0000591
showard170873e2009-01-07 00:22:26 +0000592 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
593 for object_id in object_ids:
594 agent_dict.setdefault(object_id, set()).add(agent)
595
596
597 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
598 for object_id in object_ids:
599 assert object_id in agent_dict
600 agent_dict[object_id].remove(agent)
601
602
jadmanski0afbb632008-06-06 21:10:57 +0000603 def add_agent(self, agent):
604 self._agents.append(agent)
605 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000606 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
607 self._register_agent_for_ids(self._queue_entry_agents,
608 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000609
showard170873e2009-01-07 00:22:26 +0000610
611 def get_agents_for_entry(self, queue_entry):
612 """
613 Find agents corresponding to the specified queue_entry.
614 """
615 return self._queue_entry_agents.get(queue_entry.id, set())
616
617
618 def host_has_agent(self, host):
619 """
620 Determine if there is currently an Agent present using this host.
621 """
622 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000623
624
jadmanski0afbb632008-06-06 21:10:57 +0000625 def remove_agent(self, agent):
626 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000627 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
628 agent)
629 self._unregister_agent_for_ids(self._queue_entry_agents,
630 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000631
632
showard4c5374f2008-09-04 17:02:56 +0000633 def num_running_processes(self):
634 return sum(agent.num_processes for agent in self._agents
635 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000636
637
showard170873e2009-01-07 00:22:26 +0000638 def _extract_execution_tag(self, command_line):
639 match = re.match(r'.* -P (\S+) ', command_line)
640 if not match:
641 return None
642 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000643
644
showard2bab8f42008-11-12 18:15:22 +0000645 def _recover_queue_entries(self, queue_entries, run_monitor):
646 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000647 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
648 queue_entries=queue_entries,
649 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000650 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000651 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000652
653
jadmanski0afbb632008-06-06 21:10:57 +0000654 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000655 self._register_pidfiles()
656 _drone_manager.refresh()
657 self._recover_running_entries()
658 self._recover_aborting_entries()
659 self._requeue_other_active_entries()
660 self._recover_parsing_entries()
661 self._reverify_remaining_hosts()
662 # reinitialize drones after killing orphaned processes, since they can
663 # leave around files when they die
664 _drone_manager.execute_actions()
665 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000666
showard170873e2009-01-07 00:22:26 +0000667
668 def _register_pidfiles(self):
669 # during recovery we may need to read pidfiles for both running and
670 # parsing entries
671 queue_entries = HostQueueEntry.fetch(
672 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000673 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000674 pidfile_id = _drone_manager.get_pidfile_id_from(
675 queue_entry.execution_tag())
676 _drone_manager.register_pidfile(pidfile_id)
677
678
679 def _recover_running_entries(self):
680 orphans = _drone_manager.get_orphaned_autoserv_processes()
681
682 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
683 requeue_entries = []
684 for queue_entry in queue_entries:
685 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000686 # synchronous job we've already recovered
687 continue
showard170873e2009-01-07 00:22:26 +0000688 execution_tag = queue_entry.execution_tag()
689 run_monitor = PidfileRunMonitor()
690 run_monitor.attach_to_existing_process(execution_tag)
691 if not run_monitor.has_process():
692 # autoserv apparently never got run, so let it get requeued
693 continue
showarde788ea62008-11-17 21:02:47 +0000694 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000695 logging.info('Recovering %s (process %s)',
696 (', '.join(str(entry) for entry in queue_entries),
697 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000698 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000699 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000700
jadmanski0afbb632008-06-06 21:10:57 +0000701 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000702 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000703 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000704 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000705
showard170873e2009-01-07 00:22:26 +0000706
707 def _recover_aborting_entries(self):
708 queue_entries = HostQueueEntry.fetch(
709 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000710 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000711 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000712 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000713
showard97aed502008-11-04 02:01:24 +0000714
showard170873e2009-01-07 00:22:26 +0000715 def _requeue_other_active_entries(self):
716 queue_entries = HostQueueEntry.fetch(
717 where='active AND NOT complete AND status != "Pending"')
718 for queue_entry in queue_entries:
719 if self.get_agents_for_entry(queue_entry):
720 # entry has already been recovered
721 continue
showardb18134f2009-03-20 20:52:18 +0000722 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
723 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000724 if queue_entry.host:
725 tasks = queue_entry.host.reverify_tasks()
726 self.add_agent(Agent(tasks))
727 agent = queue_entry.requeue()
728
729
730 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000731 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000732 self._reverify_hosts_where("""(status = 'Repairing' OR
733 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000734 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000735
showard170873e2009-01-07 00:22:26 +0000736 # recover "Running" hosts with no active queue entries, although this
737 # should never happen
738 message = ('Recovering running host %s - this probably indicates a '
739 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000740 self._reverify_hosts_where("""status = 'Running' AND
741 id NOT IN (SELECT host_id
742 FROM host_queue_entries
743 WHERE active)""",
744 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000745
746
jadmanski0afbb632008-06-06 21:10:57 +0000747 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000748 print_message='Reverifying host %s'):
749 full_where='locked = 0 AND invalid = 0 AND ' + where
750 for host in Host.fetch(where=full_where):
751 if self.host_has_agent(host):
752 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000753 continue
showard170873e2009-01-07 00:22:26 +0000754 if print_message:
showardb18134f2009-03-20 20:52:18 +0000755 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000756 tasks = host.reverify_tasks()
757 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000758
759
showard97aed502008-11-04 02:01:24 +0000760 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000761 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000762 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000763 if entry.id in recovered_entry_ids:
764 continue
765 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000766 recovered_entry_ids = recovered_entry_ids.union(
767 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000768 logging.info('Recovering parsing entries %s',
769 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000770
771 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000772 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000773
774
jadmanski0afbb632008-06-06 21:10:57 +0000775 def _recover_hosts(self):
776 # recover "Repair Failed" hosts
777 message = 'Reverifying dead host %s'
778 self._reverify_hosts_where("status = 'Repair Failed'",
779 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000780
781
showard3bb499f2008-07-03 19:42:20 +0000782 def _abort_timed_out_jobs(self):
783 """
784 Aborts all jobs that have timed out and not completed
785 """
showarda3ab0d52008-11-03 19:03:47 +0000786 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
787 where=['created_on + INTERVAL timeout HOUR < NOW()'])
788 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000789 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000790 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000791
792
showard98863972008-10-29 21:14:56 +0000793 def _abort_jobs_past_synch_start_timeout(self):
794 """
795 Abort synchronous jobs that are past the start timeout (from global
796 config) and are holding a machine that's in everyone.
797 """
798 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000799 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000800 timeout_start = datetime.datetime.now() - timeout_delta
801 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000802 created_on__lt=timeout_start,
803 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000804 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000805 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000806 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000807 entries_to_abort = job.hostqueueentry_set.exclude(
808 status=models.HostQueueEntry.Status.RUNNING)
809 for queue_entry in entries_to_abort:
810 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000811
812
jadmanski0afbb632008-06-06 21:10:57 +0000813 def _clear_inactive_blocks(self):
814 """
815 Clear out blocks for all completed jobs.
816 """
817 # this would be simpler using NOT IN (subquery), but MySQL
818 # treats all IN subqueries as dependent, so this optimizes much
819 # better
820 _db.execute("""
821 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000822 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000823 WHERE NOT complete) hqe
824 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000825
826
showardb95b1bd2008-08-15 18:11:04 +0000827 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000828 # prioritize by job priority, then non-metahost over metahost, then FIFO
829 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000830 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000831 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000832 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000833
834
showard89f84db2009-03-12 20:39:13 +0000835 def _refresh_pending_queue_entries(self):
836 """
837 Lookup the pending HostQueueEntries and call our HostScheduler
838 refresh() method given that list. Return the list.
839
840 @returns A list of pending HostQueueEntries sorted in priority order.
841 """
showard63a34772008-08-18 19:32:50 +0000842 queue_entries = self._get_pending_queue_entries()
843 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000844 return []
showardb95b1bd2008-08-15 18:11:04 +0000845
showard63a34772008-08-18 19:32:50 +0000846 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000847
showard89f84db2009-03-12 20:39:13 +0000848 return queue_entries
849
850
851 def _schedule_atomic_group(self, queue_entry):
852 """
853 Schedule the given queue_entry on an atomic group of hosts.
854
855 Returns immediately if there are insufficient available hosts.
856
857 Creates new HostQueueEntries based off of queue_entry for the
858 scheduled hosts and starts them all running.
859 """
860 # This is a virtual host queue entry representing an entire
861 # atomic group, find a group and schedule their hosts.
862 group_hosts = self._host_scheduler.find_eligible_atomic_group(
863 queue_entry)
864 if not group_hosts:
865 return
866 # The first assigned host uses the original HostQueueEntry
867 group_queue_entries = [queue_entry]
868 for assigned_host in group_hosts[1:]:
869 # Create a new HQE for every additional assigned_host.
870 new_hqe = HostQueueEntry.clone(queue_entry)
871 new_hqe.save()
872 group_queue_entries.append(new_hqe)
873 assert len(group_queue_entries) == len(group_hosts)
874 for queue_entry, host in itertools.izip(group_queue_entries,
875 group_hosts):
876 self._run_queue_entry(queue_entry, host)
877
878
879 def _schedule_new_jobs(self):
880 queue_entries = self._refresh_pending_queue_entries()
881 if not queue_entries:
882 return
883
showard63a34772008-08-18 19:32:50 +0000884 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000885 if (queue_entry.atomic_group_id is None or
886 queue_entry.host_id is not None):
887 assigned_host = self._host_scheduler.find_eligible_host(
888 queue_entry)
889 if assigned_host:
890 self._run_queue_entry(queue_entry, assigned_host)
891 else:
892 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000893
894
895 def _run_queue_entry(self, queue_entry, host):
896 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000897 # in some cases (synchronous jobs with run_verify=False), agent may be
898 # None
showard9976ce92008-10-15 20:28:13 +0000899 if agent:
900 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000901
902
jadmanski0afbb632008-06-06 21:10:57 +0000903 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000904 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000905 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000906 for agent in agents_to_abort:
907 self.remove_agent(agent)
908
showard170873e2009-01-07 00:22:26 +0000909 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000910
911
showard324bf812009-01-20 23:23:38 +0000912 def _can_start_agent(self, agent, num_started_this_cycle,
913 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000914 # always allow zero-process agents to run
915 if agent.num_processes == 0:
916 return True
917 # don't allow any nonzero-process agents to run after we've reached a
918 # limit (this avoids starvation of many-process agents)
919 if have_reached_limit:
920 return False
921 # total process throttling
showard324bf812009-01-20 23:23:38 +0000922 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000923 return False
924 # if a single agent exceeds the per-cycle throttling, still allow it to
925 # run when it's the first agent in the cycle
926 if num_started_this_cycle == 0:
927 return True
928 # per-cycle throttling
929 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000930 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000931 return False
932 return True
933
934
jadmanski0afbb632008-06-06 21:10:57 +0000935 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000936 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000937 have_reached_limit = False
938 # iterate over copy, so we can remove agents during iteration
939 for agent in list(self._agents):
940 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000941 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000942 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000943 continue
944 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000945 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000946 have_reached_limit):
947 have_reached_limit = True
948 continue
showard4c5374f2008-09-04 17:02:56 +0000949 num_started_this_cycle += agent.num_processes
950 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000951 logging.info('%d running processes',
952 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000953
954
showardfa8629c2008-11-04 16:51:23 +0000955 def _check_for_db_inconsistencies(self):
956 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
957 if query.count() != 0:
958 subject = ('%d queue entries found with active=complete=1'
959 % query.count())
960 message = '\n'.join(str(entry.get_object_dict())
961 for entry in query[:50])
962 if len(query) > 50:
963 message += '\n(truncated)\n'
964
showardb18134f2009-03-20 20:52:18 +0000965 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000966 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000967
968
showard170873e2009-01-07 00:22:26 +0000969class PidfileRunMonitor(object):
970 """
971 Client must call either run() to start a new process or
972 attach_to_existing_process().
973 """
mbligh36768f02008-02-22 18:28:33 +0000974
showard170873e2009-01-07 00:22:26 +0000975 class _PidfileException(Exception):
976 """
977 Raised when there's some unexpected behavior with the pid file, but only
978 used internally (never allowed to escape this class).
979 """
mbligh36768f02008-02-22 18:28:33 +0000980
981
showard170873e2009-01-07 00:22:26 +0000982 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000983 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000984 self._start_time = None
985 self.pidfile_id = None
986 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000987
988
showard170873e2009-01-07 00:22:26 +0000989 def _add_nice_command(self, command, nice_level):
990 if not nice_level:
991 return command
992 return ['nice', '-n', str(nice_level)] + command
993
994
995 def _set_start_time(self):
996 self._start_time = time.time()
997
998
999 def run(self, command, working_directory, nice_level=None, log_file=None,
1000 pidfile_name=None, paired_with_pidfile=None):
1001 assert command is not None
1002 if nice_level is not None:
1003 command = ['nice', '-n', str(nice_level)] + command
1004 self._set_start_time()
1005 self.pidfile_id = _drone_manager.execute_command(
1006 command, working_directory, log_file=log_file,
1007 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1008
1009
1010 def attach_to_existing_process(self, execution_tag):
1011 self._set_start_time()
1012 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1013 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001014
1015
jadmanski0afbb632008-06-06 21:10:57 +00001016 def kill(self):
showard170873e2009-01-07 00:22:26 +00001017 if self.has_process():
1018 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001019
mbligh36768f02008-02-22 18:28:33 +00001020
showard170873e2009-01-07 00:22:26 +00001021 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001022 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001023 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001024
1025
showard170873e2009-01-07 00:22:26 +00001026 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001027 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001028 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001029 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001030
1031
showard170873e2009-01-07 00:22:26 +00001032 def _read_pidfile(self, use_second_read=False):
1033 assert self.pidfile_id is not None, (
1034 'You must call run() or attach_to_existing_process()')
1035 contents = _drone_manager.get_pidfile_contents(
1036 self.pidfile_id, use_second_read=use_second_read)
1037 if contents.is_invalid():
1038 self._state = drone_manager.PidfileContents()
1039 raise self._PidfileException(contents)
1040 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001041
1042
showard21baa452008-10-21 00:08:39 +00001043 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001044 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1045 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001046 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001047 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001048 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001049
1050
1051 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001052 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001053 return
mblighbb421852008-03-11 22:36:16 +00001054
showard21baa452008-10-21 00:08:39 +00001055 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001056
showard170873e2009-01-07 00:22:26 +00001057 if self._state.process is None:
1058 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001059 return
mbligh90a549d2008-03-25 23:52:34 +00001060
showard21baa452008-10-21 00:08:39 +00001061 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001062 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001063 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001064 return
mbligh90a549d2008-03-25 23:52:34 +00001065
showard170873e2009-01-07 00:22:26 +00001066 # pid but no running process - maybe process *just* exited
1067 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001068 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001069 # autoserv exited without writing an exit code
1070 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001071 self._handle_pidfile_error(
1072 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001073
showard21baa452008-10-21 00:08:39 +00001074
1075 def _get_pidfile_info(self):
1076 """\
1077 After completion, self._state will contain:
1078 pid=None, exit_status=None if autoserv has not yet run
1079 pid!=None, exit_status=None if autoserv is running
1080 pid!=None, exit_status!=None if autoserv has completed
1081 """
1082 try:
1083 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001084 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001085 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001086
1087
showard170873e2009-01-07 00:22:26 +00001088 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001089 """\
1090 Called when no pidfile is found or no pid is in the pidfile.
1091 """
showard170873e2009-01-07 00:22:26 +00001092 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001093 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001094 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1095 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001096 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001097 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001098
1099
showard35162b02009-03-03 02:17:30 +00001100 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001101 """\
1102 Called when autoserv has exited without writing an exit status,
1103 or we've timed out waiting for autoserv to write a pid to the
1104 pidfile. In either case, we just return failure and the caller
1105 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001106
showard170873e2009-01-07 00:22:26 +00001107 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001108 """
1109 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001110 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001111 self._state.exit_status = 1
1112 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001113
1114
jadmanski0afbb632008-06-06 21:10:57 +00001115 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001116 self._get_pidfile_info()
1117 return self._state.exit_status
1118
1119
1120 def num_tests_failed(self):
1121 self._get_pidfile_info()
1122 assert self._state.num_tests_failed is not None
1123 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001124
1125
mbligh36768f02008-02-22 18:28:33 +00001126class Agent(object):
showard170873e2009-01-07 00:22:26 +00001127 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001128 self.active_task = None
1129 self.queue = Queue.Queue(0)
1130 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001131 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001132
showard170873e2009-01-07 00:22:26 +00001133 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1134 for task in tasks)
1135 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1136
jadmanski0afbb632008-06-06 21:10:57 +00001137 for task in tasks:
1138 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001139
1140
showard170873e2009-01-07 00:22:26 +00001141 def _union_ids(self, id_lists):
1142 return set(itertools.chain(*id_lists))
1143
1144
jadmanski0afbb632008-06-06 21:10:57 +00001145 def add_task(self, task):
1146 self.queue.put_nowait(task)
1147 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001148
1149
jadmanski0afbb632008-06-06 21:10:57 +00001150 def tick(self):
showard21baa452008-10-21 00:08:39 +00001151 while not self.is_done():
1152 if self.active_task and not self.active_task.is_done():
1153 self.active_task.poll()
1154 if not self.active_task.is_done():
1155 return
1156 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001157
1158
jadmanski0afbb632008-06-06 21:10:57 +00001159 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001160 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001161 if self.active_task:
1162 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001163
jadmanski0afbb632008-06-06 21:10:57 +00001164 if not self.active_task.success:
1165 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001166
jadmanski0afbb632008-06-06 21:10:57 +00001167 self.active_task = None
1168 if not self.is_done():
1169 self.active_task = self.queue.get_nowait()
1170 if self.active_task:
1171 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001172
1173
jadmanski0afbb632008-06-06 21:10:57 +00001174 def on_task_failure(self):
1175 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001176 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1177 # get reset.
1178 new_agent = Agent(self.active_task.failure_tasks)
1179 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001180
mblighe2586682008-02-29 22:45:46 +00001181
showard4c5374f2008-09-04 17:02:56 +00001182 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001183 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001184
1185
jadmanski0afbb632008-06-06 21:10:57 +00001186 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001187 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001188
1189
jadmanski0afbb632008-06-06 21:10:57 +00001190 def start(self):
1191 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001192
jadmanski0afbb632008-06-06 21:10:57 +00001193 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001194
jadmanski0afbb632008-06-06 21:10:57 +00001195
mbligh36768f02008-02-22 18:28:33 +00001196class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001197 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001198 self.done = False
1199 self.failure_tasks = failure_tasks
1200 self.started = False
1201 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001202 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001203 self.task = None
1204 self.agent = None
1205 self.monitor = None
1206 self.success = None
showard170873e2009-01-07 00:22:26 +00001207 self.queue_entry_ids = []
1208 self.host_ids = []
1209 self.log_file = None
1210
1211
1212 def _set_ids(self, host=None, queue_entries=None):
1213 if queue_entries and queue_entries != [None]:
1214 self.host_ids = [entry.host.id for entry in queue_entries]
1215 self.queue_entry_ids = [entry.id for entry in queue_entries]
1216 else:
1217 assert host
1218 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001219
1220
jadmanski0afbb632008-06-06 21:10:57 +00001221 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001222 if self.monitor:
1223 self.tick(self.monitor.exit_code())
1224 else:
1225 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001226
1227
jadmanski0afbb632008-06-06 21:10:57 +00001228 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001229 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001230 return
jadmanski0afbb632008-06-06 21:10:57 +00001231 if exit_code == 0:
1232 success = True
1233 else:
1234 success = False
mbligh36768f02008-02-22 18:28:33 +00001235
jadmanski0afbb632008-06-06 21:10:57 +00001236 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001237
1238
jadmanski0afbb632008-06-06 21:10:57 +00001239 def is_done(self):
1240 return self.done
mbligh36768f02008-02-22 18:28:33 +00001241
1242
jadmanski0afbb632008-06-06 21:10:57 +00001243 def finished(self, success):
1244 self.done = True
1245 self.success = success
1246 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001247
1248
jadmanski0afbb632008-06-06 21:10:57 +00001249 def prolog(self):
1250 pass
mblighd64e5702008-04-04 21:39:28 +00001251
1252
jadmanski0afbb632008-06-06 21:10:57 +00001253 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001254 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001255
mbligh36768f02008-02-22 18:28:33 +00001256
jadmanski0afbb632008-06-06 21:10:57 +00001257 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001258 if self.monitor and self.log_file:
1259 _drone_manager.copy_to_results_repository(
1260 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001261
1262
jadmanski0afbb632008-06-06 21:10:57 +00001263 def epilog(self):
1264 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001265
1266
jadmanski0afbb632008-06-06 21:10:57 +00001267 def start(self):
1268 assert self.agent
1269
1270 if not self.started:
1271 self.prolog()
1272 self.run()
1273
1274 self.started = True
1275
1276
1277 def abort(self):
1278 if self.monitor:
1279 self.monitor.kill()
1280 self.done = True
1281 self.cleanup()
1282
1283
showard170873e2009-01-07 00:22:26 +00001284 def set_host_log_file(self, base_name, host):
1285 filename = '%s.%s' % (time.time(), base_name)
1286 self.log_file = os.path.join('hosts', host.hostname, filename)
1287
1288
showardde634ee2009-01-30 01:44:24 +00001289 def _get_consistent_execution_tag(self, queue_entries):
1290 first_execution_tag = queue_entries[0].execution_tag()
1291 for queue_entry in queue_entries[1:]:
1292 assert queue_entry.execution_tag() == first_execution_tag, (
1293 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1294 queue_entry,
1295 first_execution_tag,
1296 queue_entries[0]))
1297 return first_execution_tag
1298
1299
showard678df4f2009-02-04 21:36:39 +00001300 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001301 assert len(queue_entries) > 0
1302 assert self.monitor
1303 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001304 results_path = execution_tag + '/'
1305 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1306 results_path)
showardde634ee2009-01-30 01:44:24 +00001307
1308 reparse_task = FinalReparseTask(queue_entries)
1309 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1310
1311
jadmanski0afbb632008-06-06 21:10:57 +00001312 def run(self):
1313 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001314 self.monitor = PidfileRunMonitor()
1315 self.monitor.run(self.cmd, self._working_directory,
1316 nice_level=AUTOSERV_NICE_LEVEL,
1317 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001318
1319
1320class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001321 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001322 """\
showard170873e2009-01-07 00:22:26 +00001323 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001324 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001325 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001326 # normalize the protection name
1327 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001328
jadmanski0afbb632008-06-06 21:10:57 +00001329 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001330 self.queue_entry_to_fail = queue_entry
1331 # *don't* include the queue entry in IDs -- if the queue entry is
1332 # aborted, we want to leave the repair task running
1333 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001334
1335 self.create_temp_resultsdir('.repair')
1336 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1337 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1338 '--host-protection', protection]
1339 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1340
showard170873e2009-01-07 00:22:26 +00001341 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001342
mbligh36768f02008-02-22 18:28:33 +00001343
jadmanski0afbb632008-06-06 21:10:57 +00001344 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001345 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001346 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001347 if self.queue_entry_to_fail:
1348 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001349
1350
showardde634ee2009-01-30 01:44:24 +00001351 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001352 assert self.queue_entry_to_fail
1353
1354 if self.queue_entry_to_fail.meta_host:
1355 return # don't fail metahost entries, they'll be reassigned
1356
1357 self.queue_entry_to_fail.update_from_database()
1358 if self.queue_entry_to_fail.status != 'Queued':
1359 return # entry has been aborted
1360
1361 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001362 # copy results logs into the normal place for job results
1363 _drone_manager.copy_results_on_drone(
1364 self.monitor.get_process(),
1365 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001366 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001367
showardccbd6c52009-03-21 00:10:21 +00001368 self._copy_and_parse_results([self.queue_entry_to_fail])
1369 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001370
1371
jadmanski0afbb632008-06-06 21:10:57 +00001372 def epilog(self):
1373 super(RepairTask, self).epilog()
1374 if self.success:
1375 self.host.set_status('Ready')
1376 else:
1377 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001378 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001379 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001380
1381
showard8fe93b52008-11-18 17:53:22 +00001382class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001383 def epilog(self):
1384 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001385 should_copy_results = (self.queue_entry and not self.success
1386 and not self.queue_entry.meta_host)
1387 if should_copy_results:
1388 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001389 destination = os.path.join(self.queue_entry.execution_tag(),
1390 os.path.basename(self.log_file))
1391 _drone_manager.copy_to_results_repository(
1392 self.monitor.get_process(), self.log_file,
1393 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001394
1395
1396class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001397 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001398 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001399 self.host = host or queue_entry.host
1400 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001401
jadmanski0afbb632008-06-06 21:10:57 +00001402 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001403 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1404 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001405 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001406 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1407 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001408
showard170873e2009-01-07 00:22:26 +00001409 self.set_host_log_file('verify', self.host)
1410 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001411
1412
jadmanski0afbb632008-06-06 21:10:57 +00001413 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001414 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001415 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001416 if self.queue_entry:
1417 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001418 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001419
1420
jadmanski0afbb632008-06-06 21:10:57 +00001421 def epilog(self):
1422 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001423
jadmanski0afbb632008-06-06 21:10:57 +00001424 if self.success:
1425 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001426
1427
mbligh36768f02008-02-22 18:28:33 +00001428class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001429 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001430 self.job = job
1431 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001432 super(QueueTask, self).__init__(cmd, self._execution_tag())
1433 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001434
1435
showard170873e2009-01-07 00:22:26 +00001436 def _format_keyval(self, key, value):
1437 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001438
1439
showard73ec0442009-02-07 02:05:20 +00001440 def _keyval_path(self):
1441 return os.path.join(self._execution_tag(), 'keyval')
1442
1443
1444 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1445 keyval_contents = '\n'.join(self._format_keyval(key, value)
1446 for key, value in keyval_dict.iteritems())
1447 # always end with a newline to allow additional keyvals to be written
1448 keyval_contents += '\n'
1449 _drone_manager.attach_file_to_execution(self._execution_tag(),
1450 keyval_contents,
1451 file_path=keyval_path)
1452
1453
1454 def _write_keyvals_before_job(self, keyval_dict):
1455 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1456
1457
1458 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001459 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001460 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001461 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001462 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001463
1464
showard170873e2009-01-07 00:22:26 +00001465 def _write_host_keyvals(self, host):
1466 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1467 host.hostname)
1468 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001469 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1470 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001471
1472
showard170873e2009-01-07 00:22:26 +00001473 def _execution_tag(self):
1474 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001475
1476
jadmanski0afbb632008-06-06 21:10:57 +00001477 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001478 queued = int(time.mktime(self.job.created_on.timetuple()))
1479 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001480 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001481 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001482 queue_entry.set_status('Running')
1483 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001484 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001485 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001486 assert len(self.queue_entries) == 1
1487 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001488
1489
showard35162b02009-03-03 02:17:30 +00001490 def _write_lost_process_error_file(self):
1491 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1492 _drone_manager.write_lines_to_file(error_file_path,
1493 [_LOST_PROCESS_ERROR])
1494
1495
showard97aed502008-11-04 02:01:24 +00001496 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001497 if self.monitor.has_process():
1498 self._write_keyval_after_job("job_finished", int(time.time()))
1499 self._copy_and_parse_results(self.queue_entries)
1500
1501 if self.monitor.lost_process:
1502 self._write_lost_process_error_file()
1503 for queue_entry in self.queue_entries:
1504 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001505
1506
showardcbd74612008-11-19 21:42:02 +00001507 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001508 _drone_manager.write_lines_to_file(
1509 os.path.join(self._execution_tag(), 'status.log'),
1510 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001511 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001512
1513
jadmanskif7fa2cc2008-10-01 14:13:23 +00001514 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001515 if not self.monitor or not self.monitor.has_process():
1516 return
1517
jadmanskif7fa2cc2008-10-01 14:13:23 +00001518 # build up sets of all the aborted_by and aborted_on values
1519 aborted_by, aborted_on = set(), set()
1520 for queue_entry in self.queue_entries:
1521 if queue_entry.aborted_by:
1522 aborted_by.add(queue_entry.aborted_by)
1523 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1524 aborted_on.add(t)
1525
1526 # extract some actual, unique aborted by value and write it out
1527 assert len(aborted_by) <= 1
1528 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001529 aborted_by_value = aborted_by.pop()
1530 aborted_on_value = max(aborted_on)
1531 else:
1532 aborted_by_value = 'autotest_system'
1533 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001534
showarda0382352009-02-11 23:36:43 +00001535 self._write_keyval_after_job("aborted_by", aborted_by_value)
1536 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001537
showardcbd74612008-11-19 21:42:02 +00001538 aborted_on_string = str(datetime.datetime.fromtimestamp(
1539 aborted_on_value))
1540 self._write_status_comment('Job aborted by %s on %s' %
1541 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001542
1543
jadmanski0afbb632008-06-06 21:10:57 +00001544 def abort(self):
1545 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001546 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001547 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001548
1549
showard21baa452008-10-21 00:08:39 +00001550 def _reboot_hosts(self):
1551 reboot_after = self.job.reboot_after
1552 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001553 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001554 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001555 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001556 num_tests_failed = self.monitor.num_tests_failed()
1557 do_reboot = (self.success and num_tests_failed == 0)
1558
showard8ebca792008-11-04 21:54:22 +00001559 for queue_entry in self.queue_entries:
1560 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001561 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001562 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001563 cleanup_task = CleanupTask(host=queue_entry.get_host())
1564 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001565 else:
1566 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001567
1568
jadmanski0afbb632008-06-06 21:10:57 +00001569 def epilog(self):
1570 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001571 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001572 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001573
showardb18134f2009-03-20 20:52:18 +00001574 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001575
1576
mblighbb421852008-03-11 22:36:16 +00001577class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001578 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001579 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001580 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001581
1582
jadmanski0afbb632008-06-06 21:10:57 +00001583 def run(self):
1584 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001585
1586
jadmanski0afbb632008-06-06 21:10:57 +00001587 def prolog(self):
1588 # recovering an existing process - don't do prolog
1589 pass
mblighbb421852008-03-11 22:36:16 +00001590
1591
showard8fe93b52008-11-18 17:53:22 +00001592class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001593 def __init__(self, host=None, queue_entry=None):
1594 assert bool(host) ^ bool(queue_entry)
1595 if queue_entry:
1596 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001597 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001598 self.host = host
showard170873e2009-01-07 00:22:26 +00001599
1600 self.create_temp_resultsdir('.cleanup')
1601 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1602 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001603 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001604 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1605 failure_tasks=[repair_task])
1606
1607 self._set_ids(host=host, queue_entries=[queue_entry])
1608 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001609
mblighd5c95802008-03-05 00:33:46 +00001610
jadmanski0afbb632008-06-06 21:10:57 +00001611 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001612 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001613 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001614 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001615
mblighd5c95802008-03-05 00:33:46 +00001616
showard21baa452008-10-21 00:08:39 +00001617 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001618 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001619 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001620 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001621 self.host.update_field('dirty', 0)
1622
1623
mblighd5c95802008-03-05 00:33:46 +00001624class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001625 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001626 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001627 self.queue_entry = queue_entry
1628 # don't use _set_ids, since we don't want to set the host_ids
1629 self.queue_entry_ids = [queue_entry.id]
1630 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001631
1632
jadmanski0afbb632008-06-06 21:10:57 +00001633 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001634 logging.info("starting abort on host %s, job %s",
1635 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001636
mblighd64e5702008-04-04 21:39:28 +00001637
jadmanski0afbb632008-06-06 21:10:57 +00001638 def epilog(self):
1639 super(AbortTask, self).epilog()
1640 self.queue_entry.set_status('Aborted')
1641 self.success = True
1642
1643
1644 def run(self):
1645 for agent in self.agents_to_abort:
1646 if (agent.active_task):
1647 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001648
1649
showard97aed502008-11-04 02:01:24 +00001650class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001651 _num_running_parses = 0
1652
1653 def __init__(self, queue_entries):
1654 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001655 # don't use _set_ids, since we don't want to set the host_ids
1656 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001657 self._parse_started = False
1658
1659 assert len(queue_entries) > 0
1660 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001661
showard170873e2009-01-07 00:22:26 +00001662 self._execution_tag = queue_entry.execution_tag()
1663 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1664 self._autoserv_monitor = PidfileRunMonitor()
1665 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1666 self._final_status = self._determine_final_status()
1667
showard97aed502008-11-04 02:01:24 +00001668 if _testing_mode:
1669 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001670 else:
1671 super(FinalReparseTask, self).__init__(
1672 cmd=self._generate_parse_command(),
1673 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001674
showard170873e2009-01-07 00:22:26 +00001675 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001676
1677
1678 @classmethod
1679 def _increment_running_parses(cls):
1680 cls._num_running_parses += 1
1681
1682
1683 @classmethod
1684 def _decrement_running_parses(cls):
1685 cls._num_running_parses -= 1
1686
1687
1688 @classmethod
1689 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001690 return (cls._num_running_parses <
1691 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001692
1693
showard170873e2009-01-07 00:22:26 +00001694 def _determine_final_status(self):
1695 # we'll use a PidfileRunMonitor to read the autoserv exit status
1696 if self._autoserv_monitor.exit_code() == 0:
1697 return models.HostQueueEntry.Status.COMPLETED
1698 return models.HostQueueEntry.Status.FAILED
1699
1700
showard97aed502008-11-04 02:01:24 +00001701 def prolog(self):
1702 super(FinalReparseTask, self).prolog()
1703 for queue_entry in self._queue_entries:
1704 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1705
1706
1707 def epilog(self):
1708 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001709 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001710 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001711
1712
showard2bab8f42008-11-12 18:15:22 +00001713 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001714 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1715 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001716
1717
1718 def poll(self):
1719 # override poll to keep trying to start until the parse count goes down
1720 # and we can, at which point we revert to default behavior
1721 if self._parse_started:
1722 super(FinalReparseTask, self).poll()
1723 else:
1724 self._try_starting_parse()
1725
1726
1727 def run(self):
1728 # override run() to not actually run unless we can
1729 self._try_starting_parse()
1730
1731
1732 def _try_starting_parse(self):
1733 if not self._can_run_new_parse():
1734 return
showard170873e2009-01-07 00:22:26 +00001735
showard678df4f2009-02-04 21:36:39 +00001736 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001737 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001738 if not self._autoserv_monitor.has_process():
1739 email_manager.manager.enqueue_notify_email(
1740 'No results to parse',
1741 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1742 self.finished(False)
1743 return
1744
showard97aed502008-11-04 02:01:24 +00001745 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001746 self.monitor = PidfileRunMonitor()
1747 self.monitor.run(self.cmd, self._working_directory,
1748 log_file=self.log_file,
1749 pidfile_name='.parser_execute',
1750 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1751
showard97aed502008-11-04 02:01:24 +00001752 self._increment_running_parses()
1753 self._parse_started = True
1754
1755
1756 def finished(self, success):
1757 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001758 if self._parse_started:
1759 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001760
1761
showardc9ae1782009-01-30 01:42:37 +00001762class SetEntryPendingTask(AgentTask):
1763 def __init__(self, queue_entry):
1764 super(SetEntryPendingTask, self).__init__(cmd='')
1765 self._queue_entry = queue_entry
1766 self._set_ids(queue_entries=[queue_entry])
1767
1768
1769 def run(self):
1770 agent = self._queue_entry.on_pending()
1771 if agent:
1772 self.agent.dispatcher.add_agent(agent)
1773 self.finished(True)
1774
1775
showarda3c58572009-03-12 20:36:59 +00001776class DBError(Exception):
1777 """Raised by the DBObject constructor when its select fails."""
1778
1779
mbligh36768f02008-02-22 18:28:33 +00001780class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001781 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001782
1783 # Subclasses MUST override these:
1784 _table_name = ''
1785 _fields = ()
1786
showarda3c58572009-03-12 20:36:59 +00001787 # A mapping from (type, id) to the instance of the object for that
1788 # particular id. This prevents us from creating new Job() and Host()
1789 # instances for every HostQueueEntry object that we instantiate as
1790 # multiple HQEs often share the same Job.
1791 _instances_by_type_and_id = weakref.WeakValueDictionary()
1792 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001793
showarda3c58572009-03-12 20:36:59 +00001794
1795 def __new__(cls, id=None, **kwargs):
1796 """
1797 Look to see if we already have an instance for this particular type
1798 and id. If so, use it instead of creating a duplicate instance.
1799 """
1800 if id is not None:
1801 instance = cls._instances_by_type_and_id.get((cls, id))
1802 if instance:
1803 return instance
1804 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1805
1806
1807 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001808 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001809 assert self._table_name, '_table_name must be defined in your class'
1810 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001811 if not new_record:
1812 if self._initialized and not always_query:
1813 return # We've already been initialized.
1814 if id is None:
1815 id = row[0]
1816 # Tell future constructors to use us instead of re-querying while
1817 # this instance is still around.
1818 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001819
showard6ae5ea92009-02-25 00:11:51 +00001820 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001821
jadmanski0afbb632008-06-06 21:10:57 +00001822 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001823
jadmanski0afbb632008-06-06 21:10:57 +00001824 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001825 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001826
showarda3c58572009-03-12 20:36:59 +00001827 if self._initialized:
1828 differences = self._compare_fields_in_row(row)
1829 if differences:
showard7629f142009-03-27 21:02:02 +00001830 logging.warn(
1831 'initialized %s %s instance requery is updating: %s',
1832 type(self), self.id, differences)
showard2bab8f42008-11-12 18:15:22 +00001833 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001834 self._initialized = True
1835
1836
1837 @classmethod
1838 def _clear_instance_cache(cls):
1839 """Used for testing, clear the internal instance cache."""
1840 cls._instances_by_type_and_id.clear()
1841
1842
showardccbd6c52009-03-21 00:10:21 +00001843 def _fetch_row_from_db(self, row_id):
1844 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1845 rows = _db.execute(sql, (row_id,))
1846 if not rows:
1847 raise DBError("row not found (table=%s, id=%s)"
1848 % (self.__table, id))
1849 return rows[0]
1850
1851
showarda3c58572009-03-12 20:36:59 +00001852 def _assert_row_length(self, row):
1853 assert len(row) == len(self._fields), (
1854 "table = %s, row = %s/%d, fields = %s/%d" % (
1855 self.__table, row, len(row), self._fields, len(self._fields)))
1856
1857
1858 def _compare_fields_in_row(self, row):
1859 """
1860 Given a row as returned by a SELECT query, compare it to our existing
1861 in memory fields.
1862
1863 @param row - A sequence of values corresponding to fields named in
1864 The class attribute _fields.
1865
1866 @returns A dictionary listing the differences keyed by field name
1867 containing tuples of (current_value, row_value).
1868 """
1869 self._assert_row_length(row)
1870 differences = {}
1871 for field, row_value in itertools.izip(self._fields, row):
1872 current_value = getattr(self, field)
1873 if current_value != row_value:
1874 differences[field] = (current_value, row_value)
1875 return differences
showard2bab8f42008-11-12 18:15:22 +00001876
1877
1878 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001879 """
1880 Update our field attributes using a single row returned by SELECT.
1881
1882 @param row - A sequence of values corresponding to fields named in
1883 the class fields list.
1884 """
1885 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001886
showard2bab8f42008-11-12 18:15:22 +00001887 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001888 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001889 setattr(self, field, value)
1890 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001891
showard2bab8f42008-11-12 18:15:22 +00001892 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001893
mblighe2586682008-02-29 22:45:46 +00001894
showardccbd6c52009-03-21 00:10:21 +00001895 def update_from_database(self):
1896 assert self.id is not None
1897 row = self._fetch_row_from_db(self.id)
1898 self._update_fields_from_row(row)
1899
1900
jadmanski0afbb632008-06-06 21:10:57 +00001901 def count(self, where, table = None):
1902 if not table:
1903 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001904
jadmanski0afbb632008-06-06 21:10:57 +00001905 rows = _db.execute("""
1906 SELECT count(*) FROM %s
1907 WHERE %s
1908 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001909
jadmanski0afbb632008-06-06 21:10:57 +00001910 assert len(rows) == 1
1911
1912 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001913
1914
mblighf8c624d2008-07-03 16:58:45 +00001915 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001916 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001917
showard2bab8f42008-11-12 18:15:22 +00001918 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001919 return
mbligh36768f02008-02-22 18:28:33 +00001920
mblighf8c624d2008-07-03 16:58:45 +00001921 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1922 if condition:
1923 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001924 _db.execute(query, (value, self.id))
1925
showard2bab8f42008-11-12 18:15:22 +00001926 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001927
1928
jadmanski0afbb632008-06-06 21:10:57 +00001929 def save(self):
1930 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001931 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001932 columns = ','.join([str(key) for key in keys])
1933 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001934 values_str = ','.join(values)
1935 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1936 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001937 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001938 # Update our id to the one the database just assigned to us.
1939 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001940
1941
jadmanski0afbb632008-06-06 21:10:57 +00001942 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001943 self._instances_by_type_and_id.pop((type(self), id), None)
1944 self._initialized = False
1945 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001946 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1947 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001948
1949
showard63a34772008-08-18 19:32:50 +00001950 @staticmethod
1951 def _prefix_with(string, prefix):
1952 if string:
1953 string = prefix + string
1954 return string
1955
1956
jadmanski0afbb632008-06-06 21:10:57 +00001957 @classmethod
showard989f25d2008-10-01 11:38:11 +00001958 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001959 """
1960 Construct instances of our class based on the given database query.
1961
1962 @yields One class instance for each row fetched.
1963 """
showard63a34772008-08-18 19:32:50 +00001964 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1965 where = cls._prefix_with(where, 'WHERE ')
1966 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001967 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001968 'joins' : joins,
1969 'where' : where,
1970 'order_by' : order_by})
1971 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001972 for row in rows:
1973 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001974
mbligh36768f02008-02-22 18:28:33 +00001975
1976class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001977 _table_name = 'ineligible_host_queues'
1978 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001979
1980
showard89f84db2009-03-12 20:39:13 +00001981class AtomicGroup(DBObject):
1982 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001983 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1984 'invalid')
showard89f84db2009-03-12 20:39:13 +00001985
1986
showard989f25d2008-10-01 11:38:11 +00001987class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001988 _table_name = 'labels'
1989 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001990 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001991
1992
mbligh36768f02008-02-22 18:28:33 +00001993class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001994 _table_name = 'hosts'
1995 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1996 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1997
1998
jadmanski0afbb632008-06-06 21:10:57 +00001999 def current_task(self):
2000 rows = _db.execute("""
2001 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2002 """, (self.id,))
2003
2004 if len(rows) == 0:
2005 return None
2006 else:
2007 assert len(rows) == 1
2008 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002009 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002010
2011
jadmanski0afbb632008-06-06 21:10:57 +00002012 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002013 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002014 if self.current_task():
2015 self.current_task().requeue()
2016
showard6ae5ea92009-02-25 00:11:51 +00002017
jadmanski0afbb632008-06-06 21:10:57 +00002018 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002019 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002020 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002021
2022
showard170873e2009-01-07 00:22:26 +00002023 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002024 """
showard170873e2009-01-07 00:22:26 +00002025 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002026 """
2027 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002028 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002029 FROM labels
2030 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002031 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002032 ORDER BY labels.name
2033 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002034 platform = None
2035 all_labels = []
2036 for label_name, is_platform in rows:
2037 if is_platform:
2038 platform = label_name
2039 all_labels.append(label_name)
2040 return platform, all_labels
2041
2042
2043 def reverify_tasks(self):
2044 cleanup_task = CleanupTask(host=self)
2045 verify_task = VerifyTask(host=self)
2046 # just to make sure this host does not get taken away
2047 self.set_status('Cleaning')
2048 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002049
2050
mbligh36768f02008-02-22 18:28:33 +00002051class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002052 _table_name = 'host_queue_entries'
2053 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002054 'active', 'complete', 'deleted', 'execution_subdir',
2055 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002056
2057
showarda3c58572009-03-12 20:36:59 +00002058 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002059 assert id or row
showarda3c58572009-03-12 20:36:59 +00002060 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002061 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002062
jadmanski0afbb632008-06-06 21:10:57 +00002063 if self.host_id:
2064 self.host = Host(self.host_id)
2065 else:
2066 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002067
showard170873e2009-01-07 00:22:26 +00002068 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002069 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002070
2071
showard89f84db2009-03-12 20:39:13 +00002072 @classmethod
2073 def clone(cls, template):
2074 """
2075 Creates a new row using the values from a template instance.
2076
2077 The new instance will not exist in the database or have a valid
2078 id attribute until its save() method is called.
2079 """
2080 assert isinstance(template, cls)
2081 new_row = [getattr(template, field) for field in cls._fields]
2082 clone = cls(row=new_row, new_record=True)
2083 clone.id = None
2084 return clone
2085
2086
showardc85c21b2008-11-24 22:17:37 +00002087 def _view_job_url(self):
2088 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def set_host(self, host):
2092 if host:
2093 self.queue_log_record('Assigning host ' + host.hostname)
2094 self.update_field('host_id', host.id)
2095 self.update_field('active', True)
2096 self.block_host(host.id)
2097 else:
2098 self.queue_log_record('Releasing host')
2099 self.unblock_host(self.host.id)
2100 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002101
jadmanski0afbb632008-06-06 21:10:57 +00002102 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002103
2104
jadmanski0afbb632008-06-06 21:10:57 +00002105 def get_host(self):
2106 return self.host
mbligh36768f02008-02-22 18:28:33 +00002107
2108
jadmanski0afbb632008-06-06 21:10:57 +00002109 def queue_log_record(self, log_line):
2110 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002111 _drone_manager.write_lines_to_file(self.queue_log_path,
2112 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002113
2114
jadmanski0afbb632008-06-06 21:10:57 +00002115 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002116 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002117 row = [0, self.job.id, host_id]
2118 block = IneligibleHostQueue(row=row, new_record=True)
2119 block.save()
mblighe2586682008-02-29 22:45:46 +00002120
2121
jadmanski0afbb632008-06-06 21:10:57 +00002122 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002123 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002124 blocks = IneligibleHostQueue.fetch(
2125 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2126 for block in blocks:
2127 block.delete()
mblighe2586682008-02-29 22:45:46 +00002128
2129
showard2bab8f42008-11-12 18:15:22 +00002130 def set_execution_subdir(self, subdir=None):
2131 if subdir is None:
2132 assert self.get_host()
2133 subdir = self.get_host().hostname
2134 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002135
2136
showard6355f6b2008-12-05 18:52:13 +00002137 def _get_hostname(self):
2138 if self.host:
2139 return self.host.hostname
2140 return 'no host'
2141
2142
showard170873e2009-01-07 00:22:26 +00002143 def __str__(self):
2144 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2145
2146
jadmanski0afbb632008-06-06 21:10:57 +00002147 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002148 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2149 if status not in abort_statuses:
2150 condition = ' AND '.join(['status <> "%s"' % x
2151 for x in abort_statuses])
2152 else:
2153 condition = ''
2154 self.update_field('status', status, condition=condition)
2155
showardb18134f2009-03-20 20:52:18 +00002156 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002157
showardc85c21b2008-11-24 22:17:37 +00002158 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002159 self.update_field('complete', False)
2160 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002161
jadmanski0afbb632008-06-06 21:10:57 +00002162 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002163 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002164 self.update_field('complete', False)
2165 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002166
showardc85c21b2008-11-24 22:17:37 +00002167 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002168 self.update_field('complete', True)
2169 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002170
2171 should_email_status = (status.lower() in _notify_email_statuses or
2172 'all' in _notify_email_statuses)
2173 if should_email_status:
2174 self._email_on_status(status)
2175
2176 self._email_on_job_complete()
2177
2178
2179 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002180 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002181
2182 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2183 self.job.id, self.job.name, hostname, status)
2184 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2185 self.job.id, self.job.name, hostname, status,
2186 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002187 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002188
2189
2190 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002191 if not self.job.is_finished():
2192 return
showard542e8402008-09-19 20:16:18 +00002193
showardc85c21b2008-11-24 22:17:37 +00002194 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002195 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002196 for queue_entry in hosts_queue:
2197 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002198 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002199 queue_entry.status))
2200
2201 summary_text = "\n".join(summary_text)
2202 status_counts = models.Job.objects.get_status_counts(
2203 [self.job.id])[self.job.id]
2204 status = ', '.join('%d %s' % (count, status) for status, count
2205 in status_counts.iteritems())
2206
2207 subject = 'Autotest: Job ID: %s "%s" %s' % (
2208 self.job.id, self.job.name, status)
2209 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2210 self.job.id, self.job.name, status, self._view_job_url(),
2211 summary_text)
showard170873e2009-01-07 00:22:26 +00002212 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002213
2214
showard89f84db2009-03-12 20:39:13 +00002215 def run(self, assigned_host=None):
2216 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002217 assert assigned_host
2218 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002219 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002220
showardb18134f2009-03-20 20:52:18 +00002221 logging.info("%s/%s/%s scheduled on %s, status=%s",
2222 self.job.name, self.meta_host, self.atomic_group_id,
2223 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002224
jadmanski0afbb632008-06-06 21:10:57 +00002225 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002226
showard6ae5ea92009-02-25 00:11:51 +00002227
jadmanski0afbb632008-06-06 21:10:57 +00002228 def requeue(self):
2229 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002230 # verify/cleanup failure sets the execution subdir, so reset it here
2231 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002232 if self.meta_host:
2233 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002234
2235
jadmanski0afbb632008-06-06 21:10:57 +00002236 def handle_host_failure(self):
2237 """\
2238 Called when this queue entry's host has failed verification and
2239 repair.
2240 """
2241 assert not self.meta_host
2242 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002243 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002244
2245
jadmanskif7fa2cc2008-10-01 14:13:23 +00002246 @property
2247 def aborted_by(self):
2248 self._load_abort_info()
2249 return self._aborted_by
2250
2251
2252 @property
2253 def aborted_on(self):
2254 self._load_abort_info()
2255 return self._aborted_on
2256
2257
2258 def _load_abort_info(self):
2259 """ Fetch info about who aborted the job. """
2260 if hasattr(self, "_aborted_by"):
2261 return
2262 rows = _db.execute("""
2263 SELECT users.login, aborted_host_queue_entries.aborted_on
2264 FROM aborted_host_queue_entries
2265 INNER JOIN users
2266 ON users.id = aborted_host_queue_entries.aborted_by_id
2267 WHERE aborted_host_queue_entries.queue_entry_id = %s
2268 """, (self.id,))
2269 if rows:
2270 self._aborted_by, self._aborted_on = rows[0]
2271 else:
2272 self._aborted_by = self._aborted_on = None
2273
2274
showardb2e2c322008-10-14 17:33:55 +00002275 def on_pending(self):
2276 """
2277 Called when an entry in a synchronous job has passed verify. If the
2278 job is ready to run, returns an agent to run the job. Returns None
2279 otherwise.
2280 """
2281 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002282 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002283 if self.job.is_ready():
2284 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002285 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002286 return None
2287
2288
showard170873e2009-01-07 00:22:26 +00002289 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002290 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002291 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002292 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002293
showard170873e2009-01-07 00:22:26 +00002294 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002295 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002296 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2297
2298 def execution_tag(self):
2299 assert self.execution_subdir
2300 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002301
2302
mbligh36768f02008-02-22 18:28:33 +00002303class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002304 _table_name = 'jobs'
2305 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2306 'control_type', 'created_on', 'synch_count', 'timeout',
2307 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2308
2309
showarda3c58572009-03-12 20:36:59 +00002310 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002311 assert id or row
showarda3c58572009-03-12 20:36:59 +00002312 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002313
mblighe2586682008-02-29 22:45:46 +00002314
jadmanski0afbb632008-06-06 21:10:57 +00002315 def is_server_job(self):
2316 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002317
2318
showard170873e2009-01-07 00:22:26 +00002319 def tag(self):
2320 return "%s-%s" % (self.id, self.owner)
2321
2322
jadmanski0afbb632008-06-06 21:10:57 +00002323 def get_host_queue_entries(self):
2324 rows = _db.execute("""
2325 SELECT * FROM host_queue_entries
2326 WHERE job_id= %s
2327 """, (self.id,))
2328 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002329
jadmanski0afbb632008-06-06 21:10:57 +00002330 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002331
jadmanski0afbb632008-06-06 21:10:57 +00002332 return entries
mbligh36768f02008-02-22 18:28:33 +00002333
2334
jadmanski0afbb632008-06-06 21:10:57 +00002335 def set_status(self, status, update_queues=False):
2336 self.update_field('status',status)
2337
2338 if update_queues:
2339 for queue_entry in self.get_host_queue_entries():
2340 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002341
2342
jadmanski0afbb632008-06-06 21:10:57 +00002343 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002344 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2345 status='Pending')
2346 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002347
2348
jadmanski0afbb632008-06-06 21:10:57 +00002349 def num_machines(self, clause = None):
2350 sql = "job_id=%s" % self.id
2351 if clause:
2352 sql += " AND (%s)" % clause
2353 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002354
2355
jadmanski0afbb632008-06-06 21:10:57 +00002356 def num_queued(self):
2357 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002358
2359
jadmanski0afbb632008-06-06 21:10:57 +00002360 def num_active(self):
2361 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002362
2363
jadmanski0afbb632008-06-06 21:10:57 +00002364 def num_complete(self):
2365 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002366
2367
jadmanski0afbb632008-06-06 21:10:57 +00002368 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002369 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002370
mbligh36768f02008-02-22 18:28:33 +00002371
showard6bb7c292009-01-30 01:44:51 +00002372 def _not_yet_run_entries(self, include_verifying=True):
2373 statuses = [models.HostQueueEntry.Status.QUEUED,
2374 models.HostQueueEntry.Status.PENDING]
2375 if include_verifying:
2376 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2377 return models.HostQueueEntry.objects.filter(job=self.id,
2378 status__in=statuses)
2379
2380
2381 def _stop_all_entries(self):
2382 entries_to_stop = self._not_yet_run_entries(
2383 include_verifying=False)
2384 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002385 assert not child_entry.complete, (
2386 '%s status=%s, active=%s, complete=%s' %
2387 (child_entry.id, child_entry.status, child_entry.active,
2388 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002389 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2390 child_entry.host.status = models.Host.Status.READY
2391 child_entry.host.save()
2392 child_entry.status = models.HostQueueEntry.Status.STOPPED
2393 child_entry.save()
2394
showard2bab8f42008-11-12 18:15:22 +00002395 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002396 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002397 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002398 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002399
2400
jadmanski0afbb632008-06-06 21:10:57 +00002401 def write_to_machines_file(self, queue_entry):
2402 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002403 file_path = os.path.join(self.tag(), '.machines')
2404 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002405
2406
showard2bab8f42008-11-12 18:15:22 +00002407 def _next_group_name(self):
2408 query = models.HostQueueEntry.objects.filter(
2409 job=self.id).values('execution_subdir').distinct()
2410 subdirs = (entry['execution_subdir'] for entry in query)
2411 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2412 ids = [int(match.group(1)) for match in groups if match]
2413 if ids:
2414 next_id = max(ids) + 1
2415 else:
2416 next_id = 0
2417 return "group%d" % next_id
2418
2419
showard170873e2009-01-07 00:22:26 +00002420 def _write_control_file(self, execution_tag):
2421 control_path = _drone_manager.attach_file_to_execution(
2422 execution_tag, self.control_file)
2423 return control_path
mbligh36768f02008-02-22 18:28:33 +00002424
showardb2e2c322008-10-14 17:33:55 +00002425
showard2bab8f42008-11-12 18:15:22 +00002426 def get_group_entries(self, queue_entry_from_group):
2427 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002428 return list(HostQueueEntry.fetch(
2429 where='job_id=%s AND execution_subdir=%s',
2430 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002431
2432
showardb2e2c322008-10-14 17:33:55 +00002433 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002434 assert queue_entries
2435 execution_tag = queue_entries[0].execution_tag()
2436 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002437 hostnames = ','.join([entry.get_host().hostname
2438 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002439
showard170873e2009-01-07 00:22:26 +00002440 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2441 '-r', _drone_manager.absolute_path(execution_tag),
2442 '-u', self.owner, '-l', self.name, '-m', hostnames,
2443 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002444
jadmanski0afbb632008-06-06 21:10:57 +00002445 if not self.is_server_job():
2446 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002447
showardb2e2c322008-10-14 17:33:55 +00002448 return params
mblighe2586682008-02-29 22:45:46 +00002449
mbligh36768f02008-02-22 18:28:33 +00002450
showardc9ae1782009-01-30 01:42:37 +00002451 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002452 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002453 return True
showard0fc38302008-10-23 00:44:07 +00002454 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002455 return queue_entry.get_host().dirty
2456 return False
showard21baa452008-10-21 00:08:39 +00002457
showardc9ae1782009-01-30 01:42:37 +00002458
2459 def _should_run_verify(self, queue_entry):
2460 do_not_verify = (queue_entry.host.protection ==
2461 host_protections.Protection.DO_NOT_VERIFY)
2462 if do_not_verify:
2463 return False
2464 return self.run_verify
2465
2466
2467 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002468 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002469 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002470 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002471 if self._should_run_verify(queue_entry):
2472 tasks.append(VerifyTask(queue_entry=queue_entry))
2473 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002474 return tasks
2475
2476
showard2bab8f42008-11-12 18:15:22 +00002477 def _assign_new_group(self, queue_entries):
2478 if len(queue_entries) == 1:
2479 group_name = queue_entries[0].get_host().hostname
2480 else:
2481 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002482 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002483 self.id, [entry.host.hostname for entry in queue_entries],
2484 group_name)
2485
2486 for queue_entry in queue_entries:
2487 queue_entry.set_execution_subdir(group_name)
2488
2489
2490 def _choose_group_to_run(self, include_queue_entry):
2491 chosen_entries = [include_queue_entry]
2492
2493 num_entries_needed = self.synch_count - 1
2494 if num_entries_needed > 0:
2495 pending_entries = HostQueueEntry.fetch(
2496 where='job_id = %s AND status = "Pending" AND id != %s',
2497 params=(self.id, include_queue_entry.id))
2498 chosen_entries += list(pending_entries)[:num_entries_needed]
2499
2500 self._assign_new_group(chosen_entries)
2501 return chosen_entries
2502
2503
2504 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002505 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002506 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2507 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002508
showard2bab8f42008-11-12 18:15:22 +00002509 queue_entries = self._choose_group_to_run(queue_entry)
2510 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002511
2512
2513 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002514 for queue_entry in queue_entries:
2515 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002516 params = self._get_autoserv_params(queue_entries)
2517 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2518 cmd=params)
2519 tasks = initial_tasks + [queue_task]
2520 entry_ids = [entry.id for entry in queue_entries]
2521
showard170873e2009-01-07 00:22:26 +00002522 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002523
2524
mbligh36768f02008-02-22 18:28:33 +00002525if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002526 main()