blob: 8993ca60783f2d0c7c04894bef869af3fccda651 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
57os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
58# Here we export the log name, using the same convention as autoserv's results
59# directory.
60scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
61os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
62logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
63
mbligh36768f02008-02-22 18:28:33 +000064
65def main():
jadmanski0afbb632008-06-06 21:10:57 +000066 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 parser = optparse.OptionParser(usage)
69 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
70 action='store_true')
71 parser.add_option('--logfile', help='Set a log file that all stdout ' +
72 'should be redirected to. Stderr will go to this ' +
73 'file + ".err"')
74 parser.add_option('--test', help='Indicate that scheduler is under ' +
75 'test and should use dummy autoserv and no parsing',
76 action='store_true')
77 (options, args) = parser.parse_args()
78 if len(args) != 1:
79 parser.print_usage()
80 return
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 global RESULTS_DIR
83 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000084
showardcca334f2009-03-12 20:38:34 +000085 # Change the cwd while running to avoid issues incase we were launched from
86 # somewhere odd (such as a random NFS home directory of the person running
87 # sudo to launch us as the appropriate user).
88 os.chdir(RESULTS_DIR)
89
jadmanski0afbb632008-06-06 21:10:57 +000090 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000091 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
92 "notify_email_statuses",
93 default='')
showardc85c21b2008-11-24 22:17:37 +000094 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000095 _notify_email_statuses = [status for status in
96 re.split(r'[\s,;:]', notify_statuses_list.lower())
97 if status]
showardc85c21b2008-11-24 22:17:37 +000098
jadmanski0afbb632008-06-06 21:10:57 +000099 if options.test:
100 global _autoserv_path
101 _autoserv_path = 'autoserv_dummy'
102 global _testing_mode
103 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh37eceaa2008-12-15 22:56:37 +0000105 # AUTOTEST_WEB.base_url is still a supported config option as some people
106 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000107 global _base_url
showard170873e2009-01-07 00:22:26 +0000108 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
109 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000110 if config_base_url:
111 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000112 else:
mbligh37eceaa2008-12-15 22:56:37 +0000113 # For the common case of everything running on a single server you
114 # can just set the hostname in a single place in the config file.
115 server_name = c.get_config_value('SERVER', 'hostname')
116 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000117 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000118 sys.exit(1)
119 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000120
showardc5afc462009-01-13 00:09:39 +0000121 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000122 server.start()
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 try:
showardc5afc462009-01-13 00:09:39 +0000125 init(options.logfile)
126 dispatcher = Dispatcher()
127 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 while not _shutdown:
130 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000131 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000132 except:
showard170873e2009-01-07 00:22:26 +0000133 email_manager.manager.log_stacktrace(
134 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000135
showard170873e2009-01-07 00:22:26 +0000136 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000137 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000138 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000139 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000140
141
142def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000143 global _shutdown
144 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000145 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000146
147
148def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000149 if logfile:
150 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000151 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
152 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000153
showardb1e51872008-10-07 11:08:18 +0000154 if _testing_mode:
155 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000156 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
159 global _db
showard170873e2009-01-07 00:22:26 +0000160 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000161 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000162
showardfa8629c2008-11-04 16:51:23 +0000163 # ensure Django connection is in autocommit
164 setup_django_environment.enable_autocommit()
165
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000167 signal.signal(signal.SIGINT, handle_sigint)
168
showardd1ee1dd2009-01-07 21:33:08 +0000169 drones = global_config.global_config.get_config_value(
170 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
171 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000172 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000173 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000174 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
175
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000177
178
179def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000180 out_file = logfile
181 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000183 out_fd = open(out_file, "a", buffering=0)
184 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000185
jadmanski0afbb632008-06-06 21:10:57 +0000186 os.dup2(out_fd.fileno(), sys.stdout.fileno())
187 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000188
jadmanski0afbb632008-06-06 21:10:57 +0000189 sys.stdout = out_fd
190 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000191
192
mblighd5c95802008-03-05 00:33:46 +0000193def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000194 rows = _db.execute("""
195 SELECT * FROM host_queue_entries WHERE status='Abort';
196 """)
showard2bab8f42008-11-12 18:15:22 +0000197
jadmanski0afbb632008-06-06 21:10:57 +0000198 qe = [HostQueueEntry(row=i) for i in rows]
199 return qe
mbligh36768f02008-02-22 18:28:33 +0000200
showard7cf9a9b2008-05-15 21:15:52 +0000201
showard89f84db2009-03-12 20:39:13 +0000202class SchedulerError(Exception):
203 """Raised by HostScheduler when an inconsistent state occurs."""
204
205
showard63a34772008-08-18 19:32:50 +0000206class HostScheduler(object):
207 def _get_ready_hosts(self):
208 # avoid any host with a currently active queue entry against it
209 hosts = Host.fetch(
210 joins='LEFT JOIN host_queue_entries AS active_hqe '
211 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000212 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000213 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000214 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000215 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
216 return dict((host.id, host) for host in hosts)
217
218
219 @staticmethod
220 def _get_sql_id_list(id_list):
221 return ','.join(str(item_id) for item_id in id_list)
222
223
224 @classmethod
showard989f25d2008-10-01 11:38:11 +0000225 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000226 if not id_list:
227 return {}
showard63a34772008-08-18 19:32:50 +0000228 query %= cls._get_sql_id_list(id_list)
229 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000230 return cls._process_many2many_dict(rows, flip)
231
232
233 @staticmethod
234 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000235 result = {}
236 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000237 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000238 if flip:
239 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000240 result.setdefault(left_id, set()).add(right_id)
241 return result
242
243
244 @classmethod
245 def _get_job_acl_groups(cls, job_ids):
246 query = """
showardd9ac4452009-02-07 02:04:37 +0000247 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000248 FROM jobs
249 INNER JOIN users ON users.login = jobs.owner
250 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
251 WHERE jobs.id IN (%s)
252 """
253 return cls._get_many2many_dict(query, job_ids)
254
255
256 @classmethod
257 def _get_job_ineligible_hosts(cls, job_ids):
258 query = """
259 SELECT job_id, host_id
260 FROM ineligible_host_queues
261 WHERE job_id IN (%s)
262 """
263 return cls._get_many2many_dict(query, job_ids)
264
265
266 @classmethod
showard989f25d2008-10-01 11:38:11 +0000267 def _get_job_dependencies(cls, job_ids):
268 query = """
269 SELECT job_id, label_id
270 FROM jobs_dependency_labels
271 WHERE job_id IN (%s)
272 """
273 return cls._get_many2many_dict(query, job_ids)
274
275
276 @classmethod
showard63a34772008-08-18 19:32:50 +0000277 def _get_host_acls(cls, host_ids):
278 query = """
showardd9ac4452009-02-07 02:04:37 +0000279 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000280 FROM acl_groups_hosts
281 WHERE host_id IN (%s)
282 """
283 return cls._get_many2many_dict(query, host_ids)
284
285
286 @classmethod
287 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000288 if not host_ids:
289 return {}, {}
showard63a34772008-08-18 19:32:50 +0000290 query = """
291 SELECT label_id, host_id
292 FROM hosts_labels
293 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000294 """ % cls._get_sql_id_list(host_ids)
295 rows = _db.execute(query)
296 labels_to_hosts = cls._process_many2many_dict(rows)
297 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
298 return labels_to_hosts, hosts_to_labels
299
300
301 @classmethod
302 def _get_labels(cls):
303 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000304
305
306 def refresh(self, pending_queue_entries):
307 self._hosts_available = self._get_ready_hosts()
308
309 relevant_jobs = [queue_entry.job_id
310 for queue_entry in pending_queue_entries]
311 self._job_acls = self._get_job_acl_groups(relevant_jobs)
312 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000313 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000314
315 host_ids = self._hosts_available.keys()
316 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000317 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
318
319 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000320
321
322 def _is_acl_accessible(self, host_id, queue_entry):
323 job_acls = self._job_acls.get(queue_entry.job_id, set())
324 host_acls = self._host_acls.get(host_id, set())
325 return len(host_acls.intersection(job_acls)) > 0
326
327
showard989f25d2008-10-01 11:38:11 +0000328 def _check_job_dependencies(self, job_dependencies, host_labels):
329 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000330 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000331
332
333 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
334 queue_entry):
showardade14e22009-01-26 22:38:32 +0000335 if not queue_entry.meta_host:
336 # bypass only_if_needed labels when a specific host is selected
337 return True
338
showard989f25d2008-10-01 11:38:11 +0000339 for label_id in host_labels:
340 label = self._labels[label_id]
341 if not label.only_if_needed:
342 # we don't care about non-only_if_needed labels
343 continue
344 if queue_entry.meta_host == label_id:
345 # if the label was requested in a metahost it's OK
346 continue
347 if label_id not in job_dependencies:
348 return False
349 return True
350
351
showard89f84db2009-03-12 20:39:13 +0000352 def _check_atomic_group_labels(self, host_labels, queue_entry):
353 """
354 Determine if the given HostQueueEntry's atomic group settings are okay
355 to schedule on a host with the given labels.
356
357 @param host_labels - A list of label ids that the host has.
358 @param queue_entry - The HostQueueEntry being considered for the host.
359
360 @returns True if atomic group settings are okay, False otherwise.
361 """
362 return (self._get_host_atomic_group_id(host_labels) ==
363 queue_entry.atomic_group_id)
364
365
366 def _get_host_atomic_group_id(self, host_labels):
367 """
368 Return the atomic group label id for a host with the given set of
369 labels if any, or None otherwise. Raises an exception if more than
370 one atomic group are found in the set of labels.
371
372 @param host_labels - A list of label ids that the host has.
373
374 @returns The id of the atomic group found on a label in host_labels
375 or None if no atomic group label is found.
376 @raises SchedulerError - If more than one atomic group label is found.
377 """
378 atomic_ids = [self._labels[label_id].atomic_group_id
379 for label_id in host_labels
380 if self._labels[label_id].atomic_group_id is not None]
381 if not atomic_ids:
382 return None
383 if len(atomic_ids) > 1:
384 raise SchedulerError('More than one atomic label on host.')
385 return atomic_ids[0]
386
387
388 def _get_atomic_group_labels(self, atomic_group_id):
389 """
390 Lookup the label ids that an atomic_group is associated with.
391
392 @param atomic_group_id - The id of the AtomicGroup to look up.
393
394 @returns A generator yeilding Label ids for this atomic group.
395 """
396 return (id for id, label in self._labels.iteritems()
397 if label.atomic_group_id == atomic_group_id
398 and not label.invalid)
399
400
401 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
402 """
403 @param group_hosts - A sequence of Host ids to test for usability
404 and eligibility against the Job associated with queue_entry.
405 @param queue_entry - The HostQueueEntry that these hosts are being
406 tested for eligibility against.
407
408 @returns A subset of group_hosts Host ids that are eligible for the
409 supplied queue_entry.
410 """
411 return set(host_id for host_id in group_hosts
412 if self._is_host_usable(host_id)
413 and self._is_host_eligible_for_job(host_id, queue_entry))
414
415
showard989f25d2008-10-01 11:38:11 +0000416 def _is_host_eligible_for_job(self, host_id, queue_entry):
417 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
418 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000419
showard89f84db2009-03-12 20:39:13 +0000420 return (self._is_acl_accessible(host_id, queue_entry) and
421 self._check_job_dependencies(job_dependencies, host_labels) and
422 self._check_only_if_needed_labels(
423 job_dependencies, host_labels, queue_entry) and
424 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000425
426
showard63a34772008-08-18 19:32:50 +0000427 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000428 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000429 return None
430 return self._hosts_available.pop(queue_entry.host_id, None)
431
432
433 def _is_host_usable(self, host_id):
434 if host_id not in self._hosts_available:
435 # host was already used during this scheduling cycle
436 return False
437 if self._hosts_available[host_id].invalid:
438 # Invalid hosts cannot be used for metahosts. They're included in
439 # the original query because they can be used by non-metahosts.
440 return False
441 return True
442
443
444 def _schedule_metahost(self, queue_entry):
445 label_id = queue_entry.meta_host
446 hosts_in_label = self._label_hosts.get(label_id, set())
447 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
448 set())
449
450 # must iterate over a copy so we can mutate the original while iterating
451 for host_id in list(hosts_in_label):
452 if not self._is_host_usable(host_id):
453 hosts_in_label.remove(host_id)
454 continue
455 if host_id in ineligible_host_ids:
456 continue
showard989f25d2008-10-01 11:38:11 +0000457 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000458 continue
459
showard89f84db2009-03-12 20:39:13 +0000460 # Remove the host from our cached internal state before returning
461 # the host object.
showard63a34772008-08-18 19:32:50 +0000462 hosts_in_label.remove(host_id)
463 return self._hosts_available.pop(host_id)
464 return None
465
466
467 def find_eligible_host(self, queue_entry):
468 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000469 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000470 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000471 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000472 return self._schedule_metahost(queue_entry)
473
474
showard89f84db2009-03-12 20:39:13 +0000475 def find_eligible_atomic_group(self, queue_entry):
476 """
477 Given an atomic group host queue entry, locate an appropriate group
478 of hosts for the associated job to run on.
479
480 The caller is responsible for creating new HQEs for the additional
481 hosts returned in order to run the actual job on them.
482
483 @returns A list of Host instances in a ready state to satisfy this
484 atomic group scheduling. Hosts will all belong to the same
485 atomic group label as specified by the queue_entry.
486 An empty list will be returned if no suitable atomic
487 group could be found.
488
489 TODO(gps): what is responsible for kicking off any attempted repairs on
490 a group of hosts? not this function, but something needs to. We do
491 not communicate that reason for returning [] outside of here...
492 For now, we'll just be unschedulable if enough hosts within one group
493 enter Repair Failed state.
494 """
495 assert queue_entry.atomic_group_id is not None
496 job = queue_entry.job
497 assert job.synch_count and job.synch_count > 0
498 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
499 if job.synch_count > atomic_group.max_number_of_machines:
500 # Such a Job and HostQueueEntry should never be possible to
501 # create using the frontend. Regardless, we can't process it.
502 # Abort it immediately and log an error on the scheduler.
503 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
504 bprint('Error: job %d synch_count=%d > requested atomic_group %d '
505 'max_number_of_machines=%d. Aborted host_queue_entry %d.' %
506 (job.id, job.synch_count, atomic_group.id,
507 atomic_group.max_number_of_machines, queue_entry.id))
508 return []
509 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
510 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
511 set())
512
513 # Look in each label associated with atomic_group until we find one with
514 # enough hosts to satisfy the job.
515 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
516 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
517 if queue_entry.meta_host is not None:
518 # If we have a metahost label, only allow its hosts.
519 group_hosts.intersection_update(hosts_in_label)
520 group_hosts -= ineligible_host_ids
521 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
522 group_hosts, queue_entry)
523
524 # Job.synch_count is treated as "minimum synch count" when
525 # scheduling for an atomic group of hosts. The atomic group
526 # number of machines is the maximum to pick out of a single
527 # atomic group label for scheduling at one time.
528 min_hosts = job.synch_count
529 max_hosts = atomic_group.max_number_of_machines
530
531 if len(eligible_hosts_in_group) < min_hosts:
532 # Not enough eligible hosts in this atomic group label.
533 continue
534
535 # Limit ourselves to scheduling the atomic group size.
536 if len(eligible_hosts_in_group) > max_hosts:
537 eligible_hosts_in_group = random.sample(
538 eligible_hosts_in_group, max_hosts)
539
540 # Remove the selected hosts from our cached internal state
541 # of available hosts in order to return the Host objects.
542 host_list = []
543 for host_id in eligible_hosts_in_group:
544 hosts_in_label.discard(host_id)
545 host_list.append(self._hosts_available.pop(host_id))
546 return host_list
547
548 return []
549
550
showard170873e2009-01-07 00:22:26 +0000551class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000552 def __init__(self):
553 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000554 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000555 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000556 self._host_agents = {}
557 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000558
mbligh36768f02008-02-22 18:28:33 +0000559
jadmanski0afbb632008-06-06 21:10:57 +0000560 def do_initial_recovery(self, recover_hosts=True):
561 # always recover processes
562 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000563
jadmanski0afbb632008-06-06 21:10:57 +0000564 if recover_hosts:
565 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000566
567
jadmanski0afbb632008-06-06 21:10:57 +0000568 def tick(self):
showard170873e2009-01-07 00:22:26 +0000569 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000570 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000571 self._find_aborting()
572 self._schedule_new_jobs()
573 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000574 _drone_manager.execute_actions()
575 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000576
showard97aed502008-11-04 02:01:24 +0000577
showarda3ab0d52008-11-03 19:03:47 +0000578 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000579 should_cleanup = (self._last_clean_time +
580 scheduler_config.config.clean_interval * 60 <
581 time.time())
582 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000583 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000584 self._abort_timed_out_jobs()
585 self._abort_jobs_past_synch_start_timeout()
586 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000587 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000588 self._last_clean_time = time.time()
589
mbligh36768f02008-02-22 18:28:33 +0000590
showard170873e2009-01-07 00:22:26 +0000591 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
592 for object_id in object_ids:
593 agent_dict.setdefault(object_id, set()).add(agent)
594
595
596 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
597 for object_id in object_ids:
598 assert object_id in agent_dict
599 agent_dict[object_id].remove(agent)
600
601
jadmanski0afbb632008-06-06 21:10:57 +0000602 def add_agent(self, agent):
603 self._agents.append(agent)
604 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000605 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
606 self._register_agent_for_ids(self._queue_entry_agents,
607 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000608
showard170873e2009-01-07 00:22:26 +0000609
610 def get_agents_for_entry(self, queue_entry):
611 """
612 Find agents corresponding to the specified queue_entry.
613 """
614 return self._queue_entry_agents.get(queue_entry.id, set())
615
616
617 def host_has_agent(self, host):
618 """
619 Determine if there is currently an Agent present using this host.
620 """
621 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000622
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 def remove_agent(self, agent):
625 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000626 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
627 agent)
628 self._unregister_agent_for_ids(self._queue_entry_agents,
629 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000630
631
showard4c5374f2008-09-04 17:02:56 +0000632 def num_running_processes(self):
633 return sum(agent.num_processes for agent in self._agents
634 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000635
636
showard170873e2009-01-07 00:22:26 +0000637 def _extract_execution_tag(self, command_line):
638 match = re.match(r'.* -P (\S+) ', command_line)
639 if not match:
640 return None
641 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000642
643
showard2bab8f42008-11-12 18:15:22 +0000644 def _recover_queue_entries(self, queue_entries, run_monitor):
645 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000646 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
647 queue_entries=queue_entries,
648 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000649 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000650 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000651
652
jadmanski0afbb632008-06-06 21:10:57 +0000653 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000654 self._register_pidfiles()
655 _drone_manager.refresh()
656 self._recover_running_entries()
657 self._recover_aborting_entries()
658 self._requeue_other_active_entries()
659 self._recover_parsing_entries()
660 self._reverify_remaining_hosts()
661 # reinitialize drones after killing orphaned processes, since they can
662 # leave around files when they die
663 _drone_manager.execute_actions()
664 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def _register_pidfiles(self):
668 # during recovery we may need to read pidfiles for both running and
669 # parsing entries
670 queue_entries = HostQueueEntry.fetch(
671 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000672 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000673 pidfile_id = _drone_manager.get_pidfile_id_from(
674 queue_entry.execution_tag())
675 _drone_manager.register_pidfile(pidfile_id)
676
677
678 def _recover_running_entries(self):
679 orphans = _drone_manager.get_orphaned_autoserv_processes()
680
681 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
682 requeue_entries = []
683 for queue_entry in queue_entries:
684 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000685 # synchronous job we've already recovered
686 continue
showard170873e2009-01-07 00:22:26 +0000687 execution_tag = queue_entry.execution_tag()
688 run_monitor = PidfileRunMonitor()
689 run_monitor.attach_to_existing_process(execution_tag)
690 if not run_monitor.has_process():
691 # autoserv apparently never got run, so let it get requeued
692 continue
showarde788ea62008-11-17 21:02:47 +0000693 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000694 logging.info('Recovering %s (process %s)',
695 (', '.join(str(entry) for entry in queue_entries),
696 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000697 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000698 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000699
jadmanski0afbb632008-06-06 21:10:57 +0000700 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000701 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000702 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000703 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000704
showard170873e2009-01-07 00:22:26 +0000705
706 def _recover_aborting_entries(self):
707 queue_entries = HostQueueEntry.fetch(
708 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000709 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000710 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000711 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000712
showard97aed502008-11-04 02:01:24 +0000713
showard170873e2009-01-07 00:22:26 +0000714 def _requeue_other_active_entries(self):
715 queue_entries = HostQueueEntry.fetch(
716 where='active AND NOT complete AND status != "Pending"')
717 for queue_entry in queue_entries:
718 if self.get_agents_for_entry(queue_entry):
719 # entry has already been recovered
720 continue
showardb18134f2009-03-20 20:52:18 +0000721 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
722 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000723 if queue_entry.host:
724 tasks = queue_entry.host.reverify_tasks()
725 self.add_agent(Agent(tasks))
726 agent = queue_entry.requeue()
727
728
729 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000730 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000731 self._reverify_hosts_where("""(status = 'Repairing' OR
732 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000733 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000734
showard170873e2009-01-07 00:22:26 +0000735 # recover "Running" hosts with no active queue entries, although this
736 # should never happen
737 message = ('Recovering running host %s - this probably indicates a '
738 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000739 self._reverify_hosts_where("""status = 'Running' AND
740 id NOT IN (SELECT host_id
741 FROM host_queue_entries
742 WHERE active)""",
743 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000744
745
jadmanski0afbb632008-06-06 21:10:57 +0000746 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000747 print_message='Reverifying host %s'):
748 full_where='locked = 0 AND invalid = 0 AND ' + where
749 for host in Host.fetch(where=full_where):
750 if self.host_has_agent(host):
751 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000752 continue
showard170873e2009-01-07 00:22:26 +0000753 if print_message:
showardb18134f2009-03-20 20:52:18 +0000754 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000755 tasks = host.reverify_tasks()
756 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000757
758
showard97aed502008-11-04 02:01:24 +0000759 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000760 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000761 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000762 if entry.id in recovered_entry_ids:
763 continue
764 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000765 recovered_entry_ids = recovered_entry_ids.union(
766 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000767 logging.info('Recovering parsing entries %s',
768 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000769
770 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000771 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000772
773
jadmanski0afbb632008-06-06 21:10:57 +0000774 def _recover_hosts(self):
775 # recover "Repair Failed" hosts
776 message = 'Reverifying dead host %s'
777 self._reverify_hosts_where("status = 'Repair Failed'",
778 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000779
780
showard3bb499f2008-07-03 19:42:20 +0000781 def _abort_timed_out_jobs(self):
782 """
783 Aborts all jobs that have timed out and not completed
784 """
showarda3ab0d52008-11-03 19:03:47 +0000785 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
786 where=['created_on + INTERVAL timeout HOUR < NOW()'])
787 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000788 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000789 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000790
791
showard98863972008-10-29 21:14:56 +0000792 def _abort_jobs_past_synch_start_timeout(self):
793 """
794 Abort synchronous jobs that are past the start timeout (from global
795 config) and are holding a machine that's in everyone.
796 """
797 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000798 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000799 timeout_start = datetime.datetime.now() - timeout_delta
800 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000801 created_on__lt=timeout_start,
802 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000803 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000804 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000805 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000806 entries_to_abort = job.hostqueueentry_set.exclude(
807 status=models.HostQueueEntry.Status.RUNNING)
808 for queue_entry in entries_to_abort:
809 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _clear_inactive_blocks(self):
813 """
814 Clear out blocks for all completed jobs.
815 """
816 # this would be simpler using NOT IN (subquery), but MySQL
817 # treats all IN subqueries as dependent, so this optimizes much
818 # better
819 _db.execute("""
820 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000821 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000822 WHERE NOT complete) hqe
823 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000824
825
showardb95b1bd2008-08-15 18:11:04 +0000826 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000827 # prioritize by job priority, then non-metahost over metahost, then FIFO
828 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000829 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000830 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000831 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000832
833
showard89f84db2009-03-12 20:39:13 +0000834 def _refresh_pending_queue_entries(self):
835 """
836 Lookup the pending HostQueueEntries and call our HostScheduler
837 refresh() method given that list. Return the list.
838
839 @returns A list of pending HostQueueEntries sorted in priority order.
840 """
showard63a34772008-08-18 19:32:50 +0000841 queue_entries = self._get_pending_queue_entries()
842 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000843 return []
showardb95b1bd2008-08-15 18:11:04 +0000844
showard63a34772008-08-18 19:32:50 +0000845 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000846
showard89f84db2009-03-12 20:39:13 +0000847 return queue_entries
848
849
850 def _schedule_atomic_group(self, queue_entry):
851 """
852 Schedule the given queue_entry on an atomic group of hosts.
853
854 Returns immediately if there are insufficient available hosts.
855
856 Creates new HostQueueEntries based off of queue_entry for the
857 scheduled hosts and starts them all running.
858 """
859 # This is a virtual host queue entry representing an entire
860 # atomic group, find a group and schedule their hosts.
861 group_hosts = self._host_scheduler.find_eligible_atomic_group(
862 queue_entry)
863 if not group_hosts:
864 return
865 # The first assigned host uses the original HostQueueEntry
866 group_queue_entries = [queue_entry]
867 for assigned_host in group_hosts[1:]:
868 # Create a new HQE for every additional assigned_host.
869 new_hqe = HostQueueEntry.clone(queue_entry)
870 new_hqe.save()
871 group_queue_entries.append(new_hqe)
872 assert len(group_queue_entries) == len(group_hosts)
873 for queue_entry, host in itertools.izip(group_queue_entries,
874 group_hosts):
875 self._run_queue_entry(queue_entry, host)
876
877
878 def _schedule_new_jobs(self):
879 queue_entries = self._refresh_pending_queue_entries()
880 if not queue_entries:
881 return
882
showard63a34772008-08-18 19:32:50 +0000883 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000884 if (queue_entry.atomic_group_id is None or
885 queue_entry.host_id is not None):
886 assigned_host = self._host_scheduler.find_eligible_host(
887 queue_entry)
888 if assigned_host:
889 self._run_queue_entry(queue_entry, assigned_host)
890 else:
891 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000892
893
894 def _run_queue_entry(self, queue_entry, host):
895 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000896 # in some cases (synchronous jobs with run_verify=False), agent may be
897 # None
showard9976ce92008-10-15 20:28:13 +0000898 if agent:
899 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000900
901
jadmanski0afbb632008-06-06 21:10:57 +0000902 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000903 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000904 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000905 for agent in agents_to_abort:
906 self.remove_agent(agent)
907
showard170873e2009-01-07 00:22:26 +0000908 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000909
910
showard324bf812009-01-20 23:23:38 +0000911 def _can_start_agent(self, agent, num_started_this_cycle,
912 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000913 # always allow zero-process agents to run
914 if agent.num_processes == 0:
915 return True
916 # don't allow any nonzero-process agents to run after we've reached a
917 # limit (this avoids starvation of many-process agents)
918 if have_reached_limit:
919 return False
920 # total process throttling
showard324bf812009-01-20 23:23:38 +0000921 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000922 return False
923 # if a single agent exceeds the per-cycle throttling, still allow it to
924 # run when it's the first agent in the cycle
925 if num_started_this_cycle == 0:
926 return True
927 # per-cycle throttling
928 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000929 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000930 return False
931 return True
932
933
jadmanski0afbb632008-06-06 21:10:57 +0000934 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000935 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000936 have_reached_limit = False
937 # iterate over copy, so we can remove agents during iteration
938 for agent in list(self._agents):
939 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000940 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000941 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000942 continue
943 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000944 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000945 have_reached_limit):
946 have_reached_limit = True
947 continue
showard4c5374f2008-09-04 17:02:56 +0000948 num_started_this_cycle += agent.num_processes
949 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000950 logging.info('%d running processes',
951 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000952
953
showardfa8629c2008-11-04 16:51:23 +0000954 def _check_for_db_inconsistencies(self):
955 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
956 if query.count() != 0:
957 subject = ('%d queue entries found with active=complete=1'
958 % query.count())
959 message = '\n'.join(str(entry.get_object_dict())
960 for entry in query[:50])
961 if len(query) > 50:
962 message += '\n(truncated)\n'
963
showardb18134f2009-03-20 20:52:18 +0000964 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000965 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000966
967
showard170873e2009-01-07 00:22:26 +0000968class PidfileRunMonitor(object):
969 """
970 Client must call either run() to start a new process or
971 attach_to_existing_process().
972 """
mbligh36768f02008-02-22 18:28:33 +0000973
showard170873e2009-01-07 00:22:26 +0000974 class _PidfileException(Exception):
975 """
976 Raised when there's some unexpected behavior with the pid file, but only
977 used internally (never allowed to escape this class).
978 """
mbligh36768f02008-02-22 18:28:33 +0000979
980
showard170873e2009-01-07 00:22:26 +0000981 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000982 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000983 self._start_time = None
984 self.pidfile_id = None
985 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000986
987
showard170873e2009-01-07 00:22:26 +0000988 def _add_nice_command(self, command, nice_level):
989 if not nice_level:
990 return command
991 return ['nice', '-n', str(nice_level)] + command
992
993
994 def _set_start_time(self):
995 self._start_time = time.time()
996
997
998 def run(self, command, working_directory, nice_level=None, log_file=None,
999 pidfile_name=None, paired_with_pidfile=None):
1000 assert command is not None
1001 if nice_level is not None:
1002 command = ['nice', '-n', str(nice_level)] + command
1003 self._set_start_time()
1004 self.pidfile_id = _drone_manager.execute_command(
1005 command, working_directory, log_file=log_file,
1006 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1007
1008
1009 def attach_to_existing_process(self, execution_tag):
1010 self._set_start_time()
1011 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1012 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001013
1014
jadmanski0afbb632008-06-06 21:10:57 +00001015 def kill(self):
showard170873e2009-01-07 00:22:26 +00001016 if self.has_process():
1017 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001018
mbligh36768f02008-02-22 18:28:33 +00001019
showard170873e2009-01-07 00:22:26 +00001020 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001021 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001022 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001023
1024
showard170873e2009-01-07 00:22:26 +00001025 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001026 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001027 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001028 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001029
1030
showard170873e2009-01-07 00:22:26 +00001031 def _read_pidfile(self, use_second_read=False):
1032 assert self.pidfile_id is not None, (
1033 'You must call run() or attach_to_existing_process()')
1034 contents = _drone_manager.get_pidfile_contents(
1035 self.pidfile_id, use_second_read=use_second_read)
1036 if contents.is_invalid():
1037 self._state = drone_manager.PidfileContents()
1038 raise self._PidfileException(contents)
1039 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001040
1041
showard21baa452008-10-21 00:08:39 +00001042 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001043 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1044 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001045 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001046 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001047 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001048
1049
1050 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001051 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001052 return
mblighbb421852008-03-11 22:36:16 +00001053
showard21baa452008-10-21 00:08:39 +00001054 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001055
showard170873e2009-01-07 00:22:26 +00001056 if self._state.process is None:
1057 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001058 return
mbligh90a549d2008-03-25 23:52:34 +00001059
showard21baa452008-10-21 00:08:39 +00001060 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001061 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001062 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001063 return
mbligh90a549d2008-03-25 23:52:34 +00001064
showard170873e2009-01-07 00:22:26 +00001065 # pid but no running process - maybe process *just* exited
1066 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001067 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001068 # autoserv exited without writing an exit code
1069 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001070 self._handle_pidfile_error(
1071 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001072
showard21baa452008-10-21 00:08:39 +00001073
1074 def _get_pidfile_info(self):
1075 """\
1076 After completion, self._state will contain:
1077 pid=None, exit_status=None if autoserv has not yet run
1078 pid!=None, exit_status=None if autoserv is running
1079 pid!=None, exit_status!=None if autoserv has completed
1080 """
1081 try:
1082 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001083 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001084 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001085
1086
showard170873e2009-01-07 00:22:26 +00001087 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001088 """\
1089 Called when no pidfile is found or no pid is in the pidfile.
1090 """
showard170873e2009-01-07 00:22:26 +00001091 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001092 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001093 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1094 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001095 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001096 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001097
1098
showard35162b02009-03-03 02:17:30 +00001099 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001100 """\
1101 Called when autoserv has exited without writing an exit status,
1102 or we've timed out waiting for autoserv to write a pid to the
1103 pidfile. In either case, we just return failure and the caller
1104 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001105
showard170873e2009-01-07 00:22:26 +00001106 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001107 """
1108 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001109 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001110 self._state.exit_status = 1
1111 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001112
1113
jadmanski0afbb632008-06-06 21:10:57 +00001114 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001115 self._get_pidfile_info()
1116 return self._state.exit_status
1117
1118
1119 def num_tests_failed(self):
1120 self._get_pidfile_info()
1121 assert self._state.num_tests_failed is not None
1122 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001123
1124
mbligh36768f02008-02-22 18:28:33 +00001125class Agent(object):
showard170873e2009-01-07 00:22:26 +00001126 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001127 self.active_task = None
1128 self.queue = Queue.Queue(0)
1129 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001130 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001131
showard170873e2009-01-07 00:22:26 +00001132 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1133 for task in tasks)
1134 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1135
jadmanski0afbb632008-06-06 21:10:57 +00001136 for task in tasks:
1137 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _union_ids(self, id_lists):
1141 return set(itertools.chain(*id_lists))
1142
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def add_task(self, task):
1145 self.queue.put_nowait(task)
1146 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001147
1148
jadmanski0afbb632008-06-06 21:10:57 +00001149 def tick(self):
showard21baa452008-10-21 00:08:39 +00001150 while not self.is_done():
1151 if self.active_task and not self.active_task.is_done():
1152 self.active_task.poll()
1153 if not self.active_task.is_done():
1154 return
1155 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001159 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001160 if self.active_task:
1161 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001162
jadmanski0afbb632008-06-06 21:10:57 +00001163 if not self.active_task.success:
1164 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001165
jadmanski0afbb632008-06-06 21:10:57 +00001166 self.active_task = None
1167 if not self.is_done():
1168 self.active_task = self.queue.get_nowait()
1169 if self.active_task:
1170 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def on_task_failure(self):
1174 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001175 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1176 # get reset.
1177 new_agent = Agent(self.active_task.failure_tasks)
1178 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001179
mblighe2586682008-02-29 22:45:46 +00001180
showard4c5374f2008-09-04 17:02:56 +00001181 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001182 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001183
1184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001186 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001187
1188
jadmanski0afbb632008-06-06 21:10:57 +00001189 def start(self):
1190 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001191
jadmanski0afbb632008-06-06 21:10:57 +00001192 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001193
jadmanski0afbb632008-06-06 21:10:57 +00001194
mbligh36768f02008-02-22 18:28:33 +00001195class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001196 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001197 self.done = False
1198 self.failure_tasks = failure_tasks
1199 self.started = False
1200 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001201 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001202 self.task = None
1203 self.agent = None
1204 self.monitor = None
1205 self.success = None
showard170873e2009-01-07 00:22:26 +00001206 self.queue_entry_ids = []
1207 self.host_ids = []
1208 self.log_file = None
1209
1210
1211 def _set_ids(self, host=None, queue_entries=None):
1212 if queue_entries and queue_entries != [None]:
1213 self.host_ids = [entry.host.id for entry in queue_entries]
1214 self.queue_entry_ids = [entry.id for entry in queue_entries]
1215 else:
1216 assert host
1217 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001218
1219
jadmanski0afbb632008-06-06 21:10:57 +00001220 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001221 if self.monitor:
1222 self.tick(self.monitor.exit_code())
1223 else:
1224 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001225
1226
jadmanski0afbb632008-06-06 21:10:57 +00001227 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001228 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001229 return
jadmanski0afbb632008-06-06 21:10:57 +00001230 if exit_code == 0:
1231 success = True
1232 else:
1233 success = False
mbligh36768f02008-02-22 18:28:33 +00001234
jadmanski0afbb632008-06-06 21:10:57 +00001235 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def is_done(self):
1239 return self.done
mbligh36768f02008-02-22 18:28:33 +00001240
1241
jadmanski0afbb632008-06-06 21:10:57 +00001242 def finished(self, success):
1243 self.done = True
1244 self.success = success
1245 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001246
1247
jadmanski0afbb632008-06-06 21:10:57 +00001248 def prolog(self):
1249 pass
mblighd64e5702008-04-04 21:39:28 +00001250
1251
jadmanski0afbb632008-06-06 21:10:57 +00001252 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001253 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001254
mbligh36768f02008-02-22 18:28:33 +00001255
jadmanski0afbb632008-06-06 21:10:57 +00001256 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001257 if self.monitor and self.log_file:
1258 _drone_manager.copy_to_results_repository(
1259 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001260
1261
jadmanski0afbb632008-06-06 21:10:57 +00001262 def epilog(self):
1263 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def start(self):
1267 assert self.agent
1268
1269 if not self.started:
1270 self.prolog()
1271 self.run()
1272
1273 self.started = True
1274
1275
1276 def abort(self):
1277 if self.monitor:
1278 self.monitor.kill()
1279 self.done = True
1280 self.cleanup()
1281
1282
showard170873e2009-01-07 00:22:26 +00001283 def set_host_log_file(self, base_name, host):
1284 filename = '%s.%s' % (time.time(), base_name)
1285 self.log_file = os.path.join('hosts', host.hostname, filename)
1286
1287
showardde634ee2009-01-30 01:44:24 +00001288 def _get_consistent_execution_tag(self, queue_entries):
1289 first_execution_tag = queue_entries[0].execution_tag()
1290 for queue_entry in queue_entries[1:]:
1291 assert queue_entry.execution_tag() == first_execution_tag, (
1292 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1293 queue_entry,
1294 first_execution_tag,
1295 queue_entries[0]))
1296 return first_execution_tag
1297
1298
showard678df4f2009-02-04 21:36:39 +00001299 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001300 assert len(queue_entries) > 0
1301 assert self.monitor
1302 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001303 results_path = execution_tag + '/'
1304 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1305 results_path)
showardde634ee2009-01-30 01:44:24 +00001306
1307 reparse_task = FinalReparseTask(queue_entries)
1308 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1309
1310
jadmanski0afbb632008-06-06 21:10:57 +00001311 def run(self):
1312 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001313 self.monitor = PidfileRunMonitor()
1314 self.monitor.run(self.cmd, self._working_directory,
1315 nice_level=AUTOSERV_NICE_LEVEL,
1316 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001317
1318
1319class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001320 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001321 """\
showard170873e2009-01-07 00:22:26 +00001322 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001323 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001324 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001325 # normalize the protection name
1326 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001327
jadmanski0afbb632008-06-06 21:10:57 +00001328 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001329 self.queue_entry_to_fail = queue_entry
1330 # *don't* include the queue entry in IDs -- if the queue entry is
1331 # aborted, we want to leave the repair task running
1332 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001333
1334 self.create_temp_resultsdir('.repair')
1335 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1336 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1337 '--host-protection', protection]
1338 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1339
showard170873e2009-01-07 00:22:26 +00001340 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001341
mbligh36768f02008-02-22 18:28:33 +00001342
jadmanski0afbb632008-06-06 21:10:57 +00001343 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001344 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001345 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001346 if self.queue_entry_to_fail:
1347 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001348
1349
showardde634ee2009-01-30 01:44:24 +00001350 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001351 assert self.queue_entry_to_fail
1352
1353 if self.queue_entry_to_fail.meta_host:
1354 return # don't fail metahost entries, they'll be reassigned
1355
1356 self.queue_entry_to_fail.update_from_database()
1357 if self.queue_entry_to_fail.status != 'Queued':
1358 return # entry has been aborted
1359
1360 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001361 # copy results logs into the normal place for job results
1362 _drone_manager.copy_results_on_drone(
1363 self.monitor.get_process(),
1364 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001365 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001366
showardccbd6c52009-03-21 00:10:21 +00001367 self._copy_and_parse_results([self.queue_entry_to_fail])
1368 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001369
1370
jadmanski0afbb632008-06-06 21:10:57 +00001371 def epilog(self):
1372 super(RepairTask, self).epilog()
1373 if self.success:
1374 self.host.set_status('Ready')
1375 else:
1376 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001377 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001378 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001379
1380
showard8fe93b52008-11-18 17:53:22 +00001381class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001382 def epilog(self):
1383 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001384 should_copy_results = (self.queue_entry and not self.success
1385 and not self.queue_entry.meta_host)
1386 if should_copy_results:
1387 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001388 destination = os.path.join(self.queue_entry.execution_tag(),
1389 os.path.basename(self.log_file))
1390 _drone_manager.copy_to_results_repository(
1391 self.monitor.get_process(), self.log_file,
1392 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001393
1394
1395class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001396 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001397 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001398 self.host = host or queue_entry.host
1399 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001400
jadmanski0afbb632008-06-06 21:10:57 +00001401 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001402 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1403 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001404 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001405 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1406 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001407
showard170873e2009-01-07 00:22:26 +00001408 self.set_host_log_file('verify', self.host)
1409 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001413 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001414 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001415 if self.queue_entry:
1416 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001417 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def epilog(self):
1421 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001422
jadmanski0afbb632008-06-06 21:10:57 +00001423 if self.success:
1424 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001425
1426
mbligh36768f02008-02-22 18:28:33 +00001427class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001428 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001429 self.job = job
1430 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001431 super(QueueTask, self).__init__(cmd, self._execution_tag())
1432 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001433
1434
showard170873e2009-01-07 00:22:26 +00001435 def _format_keyval(self, key, value):
1436 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001437
1438
showard73ec0442009-02-07 02:05:20 +00001439 def _keyval_path(self):
1440 return os.path.join(self._execution_tag(), 'keyval')
1441
1442
1443 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1444 keyval_contents = '\n'.join(self._format_keyval(key, value)
1445 for key, value in keyval_dict.iteritems())
1446 # always end with a newline to allow additional keyvals to be written
1447 keyval_contents += '\n'
1448 _drone_manager.attach_file_to_execution(self._execution_tag(),
1449 keyval_contents,
1450 file_path=keyval_path)
1451
1452
1453 def _write_keyvals_before_job(self, keyval_dict):
1454 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1455
1456
1457 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001458 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001459 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001460 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001461 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001462
1463
showard170873e2009-01-07 00:22:26 +00001464 def _write_host_keyvals(self, host):
1465 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1466 host.hostname)
1467 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001468 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1469 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001470
1471
showard170873e2009-01-07 00:22:26 +00001472 def _execution_tag(self):
1473 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001474
1475
jadmanski0afbb632008-06-06 21:10:57 +00001476 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001477 queued = int(time.mktime(self.job.created_on.timetuple()))
1478 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001479 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001480 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001481 queue_entry.set_status('Running')
1482 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001483 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001484 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001485 assert len(self.queue_entries) == 1
1486 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001487
1488
showard35162b02009-03-03 02:17:30 +00001489 def _write_lost_process_error_file(self):
1490 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1491 _drone_manager.write_lines_to_file(error_file_path,
1492 [_LOST_PROCESS_ERROR])
1493
1494
showard97aed502008-11-04 02:01:24 +00001495 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001496 if self.monitor.has_process():
1497 self._write_keyval_after_job("job_finished", int(time.time()))
1498 self._copy_and_parse_results(self.queue_entries)
1499
1500 if self.monitor.lost_process:
1501 self._write_lost_process_error_file()
1502 for queue_entry in self.queue_entries:
1503 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001504
1505
showardcbd74612008-11-19 21:42:02 +00001506 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001507 _drone_manager.write_lines_to_file(
1508 os.path.join(self._execution_tag(), 'status.log'),
1509 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001510 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001511
1512
jadmanskif7fa2cc2008-10-01 14:13:23 +00001513 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001514 if not self.monitor or not self.monitor.has_process():
1515 return
1516
jadmanskif7fa2cc2008-10-01 14:13:23 +00001517 # build up sets of all the aborted_by and aborted_on values
1518 aborted_by, aborted_on = set(), set()
1519 for queue_entry in self.queue_entries:
1520 if queue_entry.aborted_by:
1521 aborted_by.add(queue_entry.aborted_by)
1522 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1523 aborted_on.add(t)
1524
1525 # extract some actual, unique aborted by value and write it out
1526 assert len(aborted_by) <= 1
1527 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001528 aborted_by_value = aborted_by.pop()
1529 aborted_on_value = max(aborted_on)
1530 else:
1531 aborted_by_value = 'autotest_system'
1532 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001533
showarda0382352009-02-11 23:36:43 +00001534 self._write_keyval_after_job("aborted_by", aborted_by_value)
1535 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001536
showardcbd74612008-11-19 21:42:02 +00001537 aborted_on_string = str(datetime.datetime.fromtimestamp(
1538 aborted_on_value))
1539 self._write_status_comment('Job aborted by %s on %s' %
1540 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 def abort(self):
1544 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001545 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001546 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001547
1548
showard21baa452008-10-21 00:08:39 +00001549 def _reboot_hosts(self):
1550 reboot_after = self.job.reboot_after
1551 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001552 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001553 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001554 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001555 num_tests_failed = self.monitor.num_tests_failed()
1556 do_reboot = (self.success and num_tests_failed == 0)
1557
showard8ebca792008-11-04 21:54:22 +00001558 for queue_entry in self.queue_entries:
1559 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001560 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001561 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001562 cleanup_task = CleanupTask(host=queue_entry.get_host())
1563 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001564 else:
1565 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001566
1567
jadmanski0afbb632008-06-06 21:10:57 +00001568 def epilog(self):
1569 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001570 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001571 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001572
showardb18134f2009-03-20 20:52:18 +00001573 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001574
1575
mblighbb421852008-03-11 22:36:16 +00001576class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001577 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001578 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001579 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001580
1581
jadmanski0afbb632008-06-06 21:10:57 +00001582 def run(self):
1583 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001584
1585
jadmanski0afbb632008-06-06 21:10:57 +00001586 def prolog(self):
1587 # recovering an existing process - don't do prolog
1588 pass
mblighbb421852008-03-11 22:36:16 +00001589
1590
showard8fe93b52008-11-18 17:53:22 +00001591class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001592 def __init__(self, host=None, queue_entry=None):
1593 assert bool(host) ^ bool(queue_entry)
1594 if queue_entry:
1595 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001596 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001597 self.host = host
showard170873e2009-01-07 00:22:26 +00001598
1599 self.create_temp_resultsdir('.cleanup')
1600 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1601 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001602 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001603 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1604 failure_tasks=[repair_task])
1605
1606 self._set_ids(host=host, queue_entries=[queue_entry])
1607 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001608
mblighd5c95802008-03-05 00:33:46 +00001609
jadmanski0afbb632008-06-06 21:10:57 +00001610 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001611 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001612 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001613 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001614
mblighd5c95802008-03-05 00:33:46 +00001615
showard21baa452008-10-21 00:08:39 +00001616 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001617 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001618 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001619 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001620 self.host.update_field('dirty', 0)
1621
1622
mblighd5c95802008-03-05 00:33:46 +00001623class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001624 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001625 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001626 self.queue_entry = queue_entry
1627 # don't use _set_ids, since we don't want to set the host_ids
1628 self.queue_entry_ids = [queue_entry.id]
1629 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001630
1631
jadmanski0afbb632008-06-06 21:10:57 +00001632 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001633 logging.info("starting abort on host %s, job %s",
1634 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001635
mblighd64e5702008-04-04 21:39:28 +00001636
jadmanski0afbb632008-06-06 21:10:57 +00001637 def epilog(self):
1638 super(AbortTask, self).epilog()
1639 self.queue_entry.set_status('Aborted')
1640 self.success = True
1641
1642
1643 def run(self):
1644 for agent in self.agents_to_abort:
1645 if (agent.active_task):
1646 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001647
1648
showard97aed502008-11-04 02:01:24 +00001649class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001650 _num_running_parses = 0
1651
1652 def __init__(self, queue_entries):
1653 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001654 # don't use _set_ids, since we don't want to set the host_ids
1655 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001656 self._parse_started = False
1657
1658 assert len(queue_entries) > 0
1659 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001660
showard170873e2009-01-07 00:22:26 +00001661 self._execution_tag = queue_entry.execution_tag()
1662 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1663 self._autoserv_monitor = PidfileRunMonitor()
1664 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1665 self._final_status = self._determine_final_status()
1666
showard97aed502008-11-04 02:01:24 +00001667 if _testing_mode:
1668 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001669 else:
1670 super(FinalReparseTask, self).__init__(
1671 cmd=self._generate_parse_command(),
1672 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001673
showard170873e2009-01-07 00:22:26 +00001674 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001675
1676
1677 @classmethod
1678 def _increment_running_parses(cls):
1679 cls._num_running_parses += 1
1680
1681
1682 @classmethod
1683 def _decrement_running_parses(cls):
1684 cls._num_running_parses -= 1
1685
1686
1687 @classmethod
1688 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001689 return (cls._num_running_parses <
1690 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001691
1692
showard170873e2009-01-07 00:22:26 +00001693 def _determine_final_status(self):
1694 # we'll use a PidfileRunMonitor to read the autoserv exit status
1695 if self._autoserv_monitor.exit_code() == 0:
1696 return models.HostQueueEntry.Status.COMPLETED
1697 return models.HostQueueEntry.Status.FAILED
1698
1699
showard97aed502008-11-04 02:01:24 +00001700 def prolog(self):
1701 super(FinalReparseTask, self).prolog()
1702 for queue_entry in self._queue_entries:
1703 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1704
1705
1706 def epilog(self):
1707 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001708 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001709 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001710
1711
showard2bab8f42008-11-12 18:15:22 +00001712 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001713 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1714 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001715
1716
1717 def poll(self):
1718 # override poll to keep trying to start until the parse count goes down
1719 # and we can, at which point we revert to default behavior
1720 if self._parse_started:
1721 super(FinalReparseTask, self).poll()
1722 else:
1723 self._try_starting_parse()
1724
1725
1726 def run(self):
1727 # override run() to not actually run unless we can
1728 self._try_starting_parse()
1729
1730
1731 def _try_starting_parse(self):
1732 if not self._can_run_new_parse():
1733 return
showard170873e2009-01-07 00:22:26 +00001734
showard678df4f2009-02-04 21:36:39 +00001735 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001736 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001737 if not self._autoserv_monitor.has_process():
1738 email_manager.manager.enqueue_notify_email(
1739 'No results to parse',
1740 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1741 self.finished(False)
1742 return
1743
showard97aed502008-11-04 02:01:24 +00001744 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001745 self.monitor = PidfileRunMonitor()
1746 self.monitor.run(self.cmd, self._working_directory,
1747 log_file=self.log_file,
1748 pidfile_name='.parser_execute',
1749 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1750
showard97aed502008-11-04 02:01:24 +00001751 self._increment_running_parses()
1752 self._parse_started = True
1753
1754
1755 def finished(self, success):
1756 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001757 if self._parse_started:
1758 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001759
1760
showardc9ae1782009-01-30 01:42:37 +00001761class SetEntryPendingTask(AgentTask):
1762 def __init__(self, queue_entry):
1763 super(SetEntryPendingTask, self).__init__(cmd='')
1764 self._queue_entry = queue_entry
1765 self._set_ids(queue_entries=[queue_entry])
1766
1767
1768 def run(self):
1769 agent = self._queue_entry.on_pending()
1770 if agent:
1771 self.agent.dispatcher.add_agent(agent)
1772 self.finished(True)
1773
1774
showarda3c58572009-03-12 20:36:59 +00001775class DBError(Exception):
1776 """Raised by the DBObject constructor when its select fails."""
1777
1778
mbligh36768f02008-02-22 18:28:33 +00001779class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001780 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001781
1782 # Subclasses MUST override these:
1783 _table_name = ''
1784 _fields = ()
1785
showarda3c58572009-03-12 20:36:59 +00001786 # A mapping from (type, id) to the instance of the object for that
1787 # particular id. This prevents us from creating new Job() and Host()
1788 # instances for every HostQueueEntry object that we instantiate as
1789 # multiple HQEs often share the same Job.
1790 _instances_by_type_and_id = weakref.WeakValueDictionary()
1791 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001792
showarda3c58572009-03-12 20:36:59 +00001793
1794 def __new__(cls, id=None, **kwargs):
1795 """
1796 Look to see if we already have an instance for this particular type
1797 and id. If so, use it instead of creating a duplicate instance.
1798 """
1799 if id is not None:
1800 instance = cls._instances_by_type_and_id.get((cls, id))
1801 if instance:
1802 return instance
1803 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1804
1805
1806 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001807 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001808 assert self._table_name, '_table_name must be defined in your class'
1809 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001810 if not new_record:
1811 if self._initialized and not always_query:
1812 return # We've already been initialized.
1813 if id is None:
1814 id = row[0]
1815 # Tell future constructors to use us instead of re-querying while
1816 # this instance is still around.
1817 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001818
showard6ae5ea92009-02-25 00:11:51 +00001819 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001820
jadmanski0afbb632008-06-06 21:10:57 +00001821 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001822
jadmanski0afbb632008-06-06 21:10:57 +00001823 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001824 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001825
showarda3c58572009-03-12 20:36:59 +00001826 if self._initialized:
1827 differences = self._compare_fields_in_row(row)
1828 if differences:
1829 print ('initialized %s %s instance requery is updating: %s' %
1830 (type(self), self.id, differences))
showard2bab8f42008-11-12 18:15:22 +00001831 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001832 self._initialized = True
1833
1834
1835 @classmethod
1836 def _clear_instance_cache(cls):
1837 """Used for testing, clear the internal instance cache."""
1838 cls._instances_by_type_and_id.clear()
1839
1840
showardccbd6c52009-03-21 00:10:21 +00001841 def _fetch_row_from_db(self, row_id):
1842 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1843 rows = _db.execute(sql, (row_id,))
1844 if not rows:
1845 raise DBError("row not found (table=%s, id=%s)"
1846 % (self.__table, id))
1847 return rows[0]
1848
1849
showarda3c58572009-03-12 20:36:59 +00001850 def _assert_row_length(self, row):
1851 assert len(row) == len(self._fields), (
1852 "table = %s, row = %s/%d, fields = %s/%d" % (
1853 self.__table, row, len(row), self._fields, len(self._fields)))
1854
1855
1856 def _compare_fields_in_row(self, row):
1857 """
1858 Given a row as returned by a SELECT query, compare it to our existing
1859 in memory fields.
1860
1861 @param row - A sequence of values corresponding to fields named in
1862 The class attribute _fields.
1863
1864 @returns A dictionary listing the differences keyed by field name
1865 containing tuples of (current_value, row_value).
1866 """
1867 self._assert_row_length(row)
1868 differences = {}
1869 for field, row_value in itertools.izip(self._fields, row):
1870 current_value = getattr(self, field)
1871 if current_value != row_value:
1872 differences[field] = (current_value, row_value)
1873 return differences
showard2bab8f42008-11-12 18:15:22 +00001874
1875
1876 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001877 """
1878 Update our field attributes using a single row returned by SELECT.
1879
1880 @param row - A sequence of values corresponding to fields named in
1881 the class fields list.
1882 """
1883 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001884
showard2bab8f42008-11-12 18:15:22 +00001885 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001886 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001887 setattr(self, field, value)
1888 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001889
showard2bab8f42008-11-12 18:15:22 +00001890 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001891
mblighe2586682008-02-29 22:45:46 +00001892
showardccbd6c52009-03-21 00:10:21 +00001893 def update_from_database(self):
1894 assert self.id is not None
1895 row = self._fetch_row_from_db(self.id)
1896 self._update_fields_from_row(row)
1897
1898
jadmanski0afbb632008-06-06 21:10:57 +00001899 def count(self, where, table = None):
1900 if not table:
1901 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001902
jadmanski0afbb632008-06-06 21:10:57 +00001903 rows = _db.execute("""
1904 SELECT count(*) FROM %s
1905 WHERE %s
1906 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001907
jadmanski0afbb632008-06-06 21:10:57 +00001908 assert len(rows) == 1
1909
1910 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001911
1912
mblighf8c624d2008-07-03 16:58:45 +00001913 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001914 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001915
showard2bab8f42008-11-12 18:15:22 +00001916 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001917 return
mbligh36768f02008-02-22 18:28:33 +00001918
mblighf8c624d2008-07-03 16:58:45 +00001919 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1920 if condition:
1921 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001922 _db.execute(query, (value, self.id))
1923
showard2bab8f42008-11-12 18:15:22 +00001924 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001925
1926
jadmanski0afbb632008-06-06 21:10:57 +00001927 def save(self):
1928 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001929 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001930 columns = ','.join([str(key) for key in keys])
1931 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001932 values_str = ','.join(values)
1933 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1934 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001935 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001936 # Update our id to the one the database just assigned to us.
1937 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001938
1939
jadmanski0afbb632008-06-06 21:10:57 +00001940 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001941 self._instances_by_type_and_id.pop((type(self), id), None)
1942 self._initialized = False
1943 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001944 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1945 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001946
1947
showard63a34772008-08-18 19:32:50 +00001948 @staticmethod
1949 def _prefix_with(string, prefix):
1950 if string:
1951 string = prefix + string
1952 return string
1953
1954
jadmanski0afbb632008-06-06 21:10:57 +00001955 @classmethod
showard989f25d2008-10-01 11:38:11 +00001956 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001957 """
1958 Construct instances of our class based on the given database query.
1959
1960 @yields One class instance for each row fetched.
1961 """
showard63a34772008-08-18 19:32:50 +00001962 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1963 where = cls._prefix_with(where, 'WHERE ')
1964 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001965 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001966 'joins' : joins,
1967 'where' : where,
1968 'order_by' : order_by})
1969 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001970 for row in rows:
1971 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001972
mbligh36768f02008-02-22 18:28:33 +00001973
1974class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001975 _table_name = 'ineligible_host_queues'
1976 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001977
1978
showard89f84db2009-03-12 20:39:13 +00001979class AtomicGroup(DBObject):
1980 _table_name = 'atomic_groups'
showard205fd602009-03-21 00:17:35 +00001981 _fields = ('id', 'name', 'description', 'max_number_of_machines',
1982 'invalid')
showard89f84db2009-03-12 20:39:13 +00001983
1984
showard989f25d2008-10-01 11:38:11 +00001985class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001986 _table_name = 'labels'
1987 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001988 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001989
1990
mbligh36768f02008-02-22 18:28:33 +00001991class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001992 _table_name = 'hosts'
1993 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1994 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1995
1996
jadmanski0afbb632008-06-06 21:10:57 +00001997 def current_task(self):
1998 rows = _db.execute("""
1999 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
2000 """, (self.id,))
2001
2002 if len(rows) == 0:
2003 return None
2004 else:
2005 assert len(rows) == 1
2006 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002007 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002008
2009
jadmanski0afbb632008-06-06 21:10:57 +00002010 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002011 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002012 if self.current_task():
2013 self.current_task().requeue()
2014
showard6ae5ea92009-02-25 00:11:51 +00002015
jadmanski0afbb632008-06-06 21:10:57 +00002016 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002017 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002018 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002019
2020
showard170873e2009-01-07 00:22:26 +00002021 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002022 """
showard170873e2009-01-07 00:22:26 +00002023 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002024 """
2025 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002026 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002027 FROM labels
2028 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002029 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002030 ORDER BY labels.name
2031 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002032 platform = None
2033 all_labels = []
2034 for label_name, is_platform in rows:
2035 if is_platform:
2036 platform = label_name
2037 all_labels.append(label_name)
2038 return platform, all_labels
2039
2040
2041 def reverify_tasks(self):
2042 cleanup_task = CleanupTask(host=self)
2043 verify_task = VerifyTask(host=self)
2044 # just to make sure this host does not get taken away
2045 self.set_status('Cleaning')
2046 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002047
2048
mbligh36768f02008-02-22 18:28:33 +00002049class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002050 _table_name = 'host_queue_entries'
2051 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002052 'active', 'complete', 'deleted', 'execution_subdir',
2053 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002054
2055
showarda3c58572009-03-12 20:36:59 +00002056 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002057 assert id or row
showarda3c58572009-03-12 20:36:59 +00002058 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002059 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002060
jadmanski0afbb632008-06-06 21:10:57 +00002061 if self.host_id:
2062 self.host = Host(self.host_id)
2063 else:
2064 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002065
showard170873e2009-01-07 00:22:26 +00002066 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002067 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002068
2069
showard89f84db2009-03-12 20:39:13 +00002070 @classmethod
2071 def clone(cls, template):
2072 """
2073 Creates a new row using the values from a template instance.
2074
2075 The new instance will not exist in the database or have a valid
2076 id attribute until its save() method is called.
2077 """
2078 assert isinstance(template, cls)
2079 new_row = [getattr(template, field) for field in cls._fields]
2080 clone = cls(row=new_row, new_record=True)
2081 clone.id = None
2082 return clone
2083
2084
showardc85c21b2008-11-24 22:17:37 +00002085 def _view_job_url(self):
2086 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2087
2088
jadmanski0afbb632008-06-06 21:10:57 +00002089 def set_host(self, host):
2090 if host:
2091 self.queue_log_record('Assigning host ' + host.hostname)
2092 self.update_field('host_id', host.id)
2093 self.update_field('active', True)
2094 self.block_host(host.id)
2095 else:
2096 self.queue_log_record('Releasing host')
2097 self.unblock_host(self.host.id)
2098 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002099
jadmanski0afbb632008-06-06 21:10:57 +00002100 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002101
2102
jadmanski0afbb632008-06-06 21:10:57 +00002103 def get_host(self):
2104 return self.host
mbligh36768f02008-02-22 18:28:33 +00002105
2106
jadmanski0afbb632008-06-06 21:10:57 +00002107 def queue_log_record(self, log_line):
2108 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002109 _drone_manager.write_lines_to_file(self.queue_log_path,
2110 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002111
2112
jadmanski0afbb632008-06-06 21:10:57 +00002113 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002114 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002115 row = [0, self.job.id, host_id]
2116 block = IneligibleHostQueue(row=row, new_record=True)
2117 block.save()
mblighe2586682008-02-29 22:45:46 +00002118
2119
jadmanski0afbb632008-06-06 21:10:57 +00002120 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002121 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002122 blocks = IneligibleHostQueue.fetch(
2123 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2124 for block in blocks:
2125 block.delete()
mblighe2586682008-02-29 22:45:46 +00002126
2127
showard2bab8f42008-11-12 18:15:22 +00002128 def set_execution_subdir(self, subdir=None):
2129 if subdir is None:
2130 assert self.get_host()
2131 subdir = self.get_host().hostname
2132 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002133
2134
showard6355f6b2008-12-05 18:52:13 +00002135 def _get_hostname(self):
2136 if self.host:
2137 return self.host.hostname
2138 return 'no host'
2139
2140
showard170873e2009-01-07 00:22:26 +00002141 def __str__(self):
2142 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2143
2144
jadmanski0afbb632008-06-06 21:10:57 +00002145 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002146 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2147 if status not in abort_statuses:
2148 condition = ' AND '.join(['status <> "%s"' % x
2149 for x in abort_statuses])
2150 else:
2151 condition = ''
2152 self.update_field('status', status, condition=condition)
2153
showardb18134f2009-03-20 20:52:18 +00002154 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002155
showardc85c21b2008-11-24 22:17:37 +00002156 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002157 self.update_field('complete', False)
2158 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002159
jadmanski0afbb632008-06-06 21:10:57 +00002160 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002161 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002162 self.update_field('complete', False)
2163 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002164
showardc85c21b2008-11-24 22:17:37 +00002165 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002166 self.update_field('complete', True)
2167 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002168
2169 should_email_status = (status.lower() in _notify_email_statuses or
2170 'all' in _notify_email_statuses)
2171 if should_email_status:
2172 self._email_on_status(status)
2173
2174 self._email_on_job_complete()
2175
2176
2177 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002178 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002179
2180 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2181 self.job.id, self.job.name, hostname, status)
2182 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2183 self.job.id, self.job.name, hostname, status,
2184 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002185 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002186
2187
2188 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002189 if not self.job.is_finished():
2190 return
showard542e8402008-09-19 20:16:18 +00002191
showardc85c21b2008-11-24 22:17:37 +00002192 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002193 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002194 for queue_entry in hosts_queue:
2195 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002196 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002197 queue_entry.status))
2198
2199 summary_text = "\n".join(summary_text)
2200 status_counts = models.Job.objects.get_status_counts(
2201 [self.job.id])[self.job.id]
2202 status = ', '.join('%d %s' % (count, status) for status, count
2203 in status_counts.iteritems())
2204
2205 subject = 'Autotest: Job ID: %s "%s" %s' % (
2206 self.job.id, self.job.name, status)
2207 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2208 self.job.id, self.job.name, status, self._view_job_url(),
2209 summary_text)
showard170873e2009-01-07 00:22:26 +00002210 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002211
2212
showard89f84db2009-03-12 20:39:13 +00002213 def run(self, assigned_host=None):
2214 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002215 assert assigned_host
2216 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002217 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002218
showardb18134f2009-03-20 20:52:18 +00002219 logging.info("%s/%s/%s scheduled on %s, status=%s",
2220 self.job.name, self.meta_host, self.atomic_group_id,
2221 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002222
jadmanski0afbb632008-06-06 21:10:57 +00002223 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002224
showard6ae5ea92009-02-25 00:11:51 +00002225
jadmanski0afbb632008-06-06 21:10:57 +00002226 def requeue(self):
2227 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002228 # verify/cleanup failure sets the execution subdir, so reset it here
2229 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002230 if self.meta_host:
2231 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002232
2233
jadmanski0afbb632008-06-06 21:10:57 +00002234 def handle_host_failure(self):
2235 """\
2236 Called when this queue entry's host has failed verification and
2237 repair.
2238 """
2239 assert not self.meta_host
2240 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002241 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002242
2243
jadmanskif7fa2cc2008-10-01 14:13:23 +00002244 @property
2245 def aborted_by(self):
2246 self._load_abort_info()
2247 return self._aborted_by
2248
2249
2250 @property
2251 def aborted_on(self):
2252 self._load_abort_info()
2253 return self._aborted_on
2254
2255
2256 def _load_abort_info(self):
2257 """ Fetch info about who aborted the job. """
2258 if hasattr(self, "_aborted_by"):
2259 return
2260 rows = _db.execute("""
2261 SELECT users.login, aborted_host_queue_entries.aborted_on
2262 FROM aborted_host_queue_entries
2263 INNER JOIN users
2264 ON users.id = aborted_host_queue_entries.aborted_by_id
2265 WHERE aborted_host_queue_entries.queue_entry_id = %s
2266 """, (self.id,))
2267 if rows:
2268 self._aborted_by, self._aborted_on = rows[0]
2269 else:
2270 self._aborted_by = self._aborted_on = None
2271
2272
showardb2e2c322008-10-14 17:33:55 +00002273 def on_pending(self):
2274 """
2275 Called when an entry in a synchronous job has passed verify. If the
2276 job is ready to run, returns an agent to run the job. Returns None
2277 otherwise.
2278 """
2279 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002280 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002281 if self.job.is_ready():
2282 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002283 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002284 return None
2285
2286
showard170873e2009-01-07 00:22:26 +00002287 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002288 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002289 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002290 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002291
showard170873e2009-01-07 00:22:26 +00002292 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002293 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002294 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2295
2296 def execution_tag(self):
2297 assert self.execution_subdir
2298 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002299
2300
mbligh36768f02008-02-22 18:28:33 +00002301class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002302 _table_name = 'jobs'
2303 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2304 'control_type', 'created_on', 'synch_count', 'timeout',
2305 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2306
2307
showarda3c58572009-03-12 20:36:59 +00002308 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002309 assert id or row
showarda3c58572009-03-12 20:36:59 +00002310 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002311
mblighe2586682008-02-29 22:45:46 +00002312
jadmanski0afbb632008-06-06 21:10:57 +00002313 def is_server_job(self):
2314 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002315
2316
showard170873e2009-01-07 00:22:26 +00002317 def tag(self):
2318 return "%s-%s" % (self.id, self.owner)
2319
2320
jadmanski0afbb632008-06-06 21:10:57 +00002321 def get_host_queue_entries(self):
2322 rows = _db.execute("""
2323 SELECT * FROM host_queue_entries
2324 WHERE job_id= %s
2325 """, (self.id,))
2326 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002327
jadmanski0afbb632008-06-06 21:10:57 +00002328 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002329
jadmanski0afbb632008-06-06 21:10:57 +00002330 return entries
mbligh36768f02008-02-22 18:28:33 +00002331
2332
jadmanski0afbb632008-06-06 21:10:57 +00002333 def set_status(self, status, update_queues=False):
2334 self.update_field('status',status)
2335
2336 if update_queues:
2337 for queue_entry in self.get_host_queue_entries():
2338 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002339
2340
jadmanski0afbb632008-06-06 21:10:57 +00002341 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002342 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2343 status='Pending')
2344 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002345
2346
jadmanski0afbb632008-06-06 21:10:57 +00002347 def num_machines(self, clause = None):
2348 sql = "job_id=%s" % self.id
2349 if clause:
2350 sql += " AND (%s)" % clause
2351 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002352
2353
jadmanski0afbb632008-06-06 21:10:57 +00002354 def num_queued(self):
2355 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002356
2357
jadmanski0afbb632008-06-06 21:10:57 +00002358 def num_active(self):
2359 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002360
2361
jadmanski0afbb632008-06-06 21:10:57 +00002362 def num_complete(self):
2363 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002364
2365
jadmanski0afbb632008-06-06 21:10:57 +00002366 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002367 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002368
mbligh36768f02008-02-22 18:28:33 +00002369
showard6bb7c292009-01-30 01:44:51 +00002370 def _not_yet_run_entries(self, include_verifying=True):
2371 statuses = [models.HostQueueEntry.Status.QUEUED,
2372 models.HostQueueEntry.Status.PENDING]
2373 if include_verifying:
2374 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2375 return models.HostQueueEntry.objects.filter(job=self.id,
2376 status__in=statuses)
2377
2378
2379 def _stop_all_entries(self):
2380 entries_to_stop = self._not_yet_run_entries(
2381 include_verifying=False)
2382 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002383 assert not child_entry.complete, (
2384 '%s status=%s, active=%s, complete=%s' %
2385 (child_entry.id, child_entry.status, child_entry.active,
2386 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002387 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2388 child_entry.host.status = models.Host.Status.READY
2389 child_entry.host.save()
2390 child_entry.status = models.HostQueueEntry.Status.STOPPED
2391 child_entry.save()
2392
showard2bab8f42008-11-12 18:15:22 +00002393 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002394 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002395 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002396 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002397
2398
jadmanski0afbb632008-06-06 21:10:57 +00002399 def write_to_machines_file(self, queue_entry):
2400 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002401 file_path = os.path.join(self.tag(), '.machines')
2402 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002403
2404
showard2bab8f42008-11-12 18:15:22 +00002405 def _next_group_name(self):
2406 query = models.HostQueueEntry.objects.filter(
2407 job=self.id).values('execution_subdir').distinct()
2408 subdirs = (entry['execution_subdir'] for entry in query)
2409 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2410 ids = [int(match.group(1)) for match in groups if match]
2411 if ids:
2412 next_id = max(ids) + 1
2413 else:
2414 next_id = 0
2415 return "group%d" % next_id
2416
2417
showard170873e2009-01-07 00:22:26 +00002418 def _write_control_file(self, execution_tag):
2419 control_path = _drone_manager.attach_file_to_execution(
2420 execution_tag, self.control_file)
2421 return control_path
mbligh36768f02008-02-22 18:28:33 +00002422
showardb2e2c322008-10-14 17:33:55 +00002423
showard2bab8f42008-11-12 18:15:22 +00002424 def get_group_entries(self, queue_entry_from_group):
2425 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002426 return list(HostQueueEntry.fetch(
2427 where='job_id=%s AND execution_subdir=%s',
2428 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002429
2430
showardb2e2c322008-10-14 17:33:55 +00002431 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002432 assert queue_entries
2433 execution_tag = queue_entries[0].execution_tag()
2434 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002435 hostnames = ','.join([entry.get_host().hostname
2436 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002437
showard170873e2009-01-07 00:22:26 +00002438 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2439 '-r', _drone_manager.absolute_path(execution_tag),
2440 '-u', self.owner, '-l', self.name, '-m', hostnames,
2441 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002442
jadmanski0afbb632008-06-06 21:10:57 +00002443 if not self.is_server_job():
2444 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002445
showardb2e2c322008-10-14 17:33:55 +00002446 return params
mblighe2586682008-02-29 22:45:46 +00002447
mbligh36768f02008-02-22 18:28:33 +00002448
showardc9ae1782009-01-30 01:42:37 +00002449 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002450 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002451 return True
showard0fc38302008-10-23 00:44:07 +00002452 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002453 return queue_entry.get_host().dirty
2454 return False
showard21baa452008-10-21 00:08:39 +00002455
showardc9ae1782009-01-30 01:42:37 +00002456
2457 def _should_run_verify(self, queue_entry):
2458 do_not_verify = (queue_entry.host.protection ==
2459 host_protections.Protection.DO_NOT_VERIFY)
2460 if do_not_verify:
2461 return False
2462 return self.run_verify
2463
2464
2465 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002466 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002467 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002468 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002469 if self._should_run_verify(queue_entry):
2470 tasks.append(VerifyTask(queue_entry=queue_entry))
2471 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002472 return tasks
2473
2474
showard2bab8f42008-11-12 18:15:22 +00002475 def _assign_new_group(self, queue_entries):
2476 if len(queue_entries) == 1:
2477 group_name = queue_entries[0].get_host().hostname
2478 else:
2479 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002480 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002481 self.id, [entry.host.hostname for entry in queue_entries],
2482 group_name)
2483
2484 for queue_entry in queue_entries:
2485 queue_entry.set_execution_subdir(group_name)
2486
2487
2488 def _choose_group_to_run(self, include_queue_entry):
2489 chosen_entries = [include_queue_entry]
2490
2491 num_entries_needed = self.synch_count - 1
2492 if num_entries_needed > 0:
2493 pending_entries = HostQueueEntry.fetch(
2494 where='job_id = %s AND status = "Pending" AND id != %s',
2495 params=(self.id, include_queue_entry.id))
2496 chosen_entries += list(pending_entries)[:num_entries_needed]
2497
2498 self._assign_new_group(chosen_entries)
2499 return chosen_entries
2500
2501
2502 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002503 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002504 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2505 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002506
showard2bab8f42008-11-12 18:15:22 +00002507 queue_entries = self._choose_group_to_run(queue_entry)
2508 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002509
2510
2511 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002512 for queue_entry in queue_entries:
2513 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002514 params = self._get_autoserv_params(queue_entries)
2515 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2516 cmd=params)
2517 tasks = initial_tasks + [queue_task]
2518 entry_ids = [entry.id for entry in queue_entries]
2519
showard170873e2009-01-07 00:22:26 +00002520 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002521
2522
mbligh36768f02008-02-22 18:28:33 +00002523if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002524 main()