blob: 83df096a8a4e651875127410001d53e23bc51b5d [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
57os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
58# Here we export the log name, using the same convention as autoserv's results
59# directory.
60scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
61os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
62logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
63
mbligh36768f02008-02-22 18:28:33 +000064
65def main():
jadmanski0afbb632008-06-06 21:10:57 +000066 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 parser = optparse.OptionParser(usage)
69 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
70 action='store_true')
71 parser.add_option('--logfile', help='Set a log file that all stdout ' +
72 'should be redirected to. Stderr will go to this ' +
73 'file + ".err"')
74 parser.add_option('--test', help='Indicate that scheduler is under ' +
75 'test and should use dummy autoserv and no parsing',
76 action='store_true')
77 (options, args) = parser.parse_args()
78 if len(args) != 1:
79 parser.print_usage()
80 return
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 global RESULTS_DIR
83 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000084
showardcca334f2009-03-12 20:38:34 +000085 # Change the cwd while running to avoid issues incase we were launched from
86 # somewhere odd (such as a random NFS home directory of the person running
87 # sudo to launch us as the appropriate user).
88 os.chdir(RESULTS_DIR)
89
jadmanski0afbb632008-06-06 21:10:57 +000090 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000091 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
92 "notify_email_statuses",
93 default='')
showardc85c21b2008-11-24 22:17:37 +000094 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000095 _notify_email_statuses = [status for status in
96 re.split(r'[\s,;:]', notify_statuses_list.lower())
97 if status]
showardc85c21b2008-11-24 22:17:37 +000098
jadmanski0afbb632008-06-06 21:10:57 +000099 if options.test:
100 global _autoserv_path
101 _autoserv_path = 'autoserv_dummy'
102 global _testing_mode
103 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh37eceaa2008-12-15 22:56:37 +0000105 # AUTOTEST_WEB.base_url is still a supported config option as some people
106 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000107 global _base_url
showard170873e2009-01-07 00:22:26 +0000108 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
109 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000110 if config_base_url:
111 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000112 else:
mbligh37eceaa2008-12-15 22:56:37 +0000113 # For the common case of everything running on a single server you
114 # can just set the hostname in a single place in the config file.
115 server_name = c.get_config_value('SERVER', 'hostname')
116 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000117 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000118 sys.exit(1)
119 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000120
showardc5afc462009-01-13 00:09:39 +0000121 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000122 server.start()
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 try:
showardc5afc462009-01-13 00:09:39 +0000125 init(options.logfile)
126 dispatcher = Dispatcher()
127 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 while not _shutdown:
130 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000131 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000132 except:
showard170873e2009-01-07 00:22:26 +0000133 email_manager.manager.log_stacktrace(
134 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000135
showard170873e2009-01-07 00:22:26 +0000136 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000137 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000138 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000139 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000140
141
142def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000143 global _shutdown
144 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000145 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000146
147
148def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000149 if logfile:
150 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000151 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
152 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000153
showardb1e51872008-10-07 11:08:18 +0000154 if _testing_mode:
155 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000156 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
159 global _db
showard170873e2009-01-07 00:22:26 +0000160 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000161 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000162
showardfa8629c2008-11-04 16:51:23 +0000163 # ensure Django connection is in autocommit
164 setup_django_environment.enable_autocommit()
165
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000167 signal.signal(signal.SIGINT, handle_sigint)
168
showardd1ee1dd2009-01-07 21:33:08 +0000169 drones = global_config.global_config.get_config_value(
170 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
171 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000172 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000173 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000174 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
175
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000177
178
179def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000180 out_file = logfile
181 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000183 out_fd = open(out_file, "a", buffering=0)
184 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000185
jadmanski0afbb632008-06-06 21:10:57 +0000186 os.dup2(out_fd.fileno(), sys.stdout.fileno())
187 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000188
jadmanski0afbb632008-06-06 21:10:57 +0000189 sys.stdout = out_fd
190 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000191
192
mblighd5c95802008-03-05 00:33:46 +0000193def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000194 rows = _db.execute("""
195 SELECT * FROM host_queue_entries WHERE status='Abort';
196 """)
showard2bab8f42008-11-12 18:15:22 +0000197
jadmanski0afbb632008-06-06 21:10:57 +0000198 qe = [HostQueueEntry(row=i) for i in rows]
199 return qe
mbligh36768f02008-02-22 18:28:33 +0000200
showard7cf9a9b2008-05-15 21:15:52 +0000201
showard89f84db2009-03-12 20:39:13 +0000202class SchedulerError(Exception):
203 """Raised by HostScheduler when an inconsistent state occurs."""
204
205
showard63a34772008-08-18 19:32:50 +0000206class HostScheduler(object):
207 def _get_ready_hosts(self):
208 # avoid any host with a currently active queue entry against it
209 hosts = Host.fetch(
210 joins='LEFT JOIN host_queue_entries AS active_hqe '
211 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000212 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000213 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000214 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000215 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
216 return dict((host.id, host) for host in hosts)
217
218
219 @staticmethod
220 def _get_sql_id_list(id_list):
221 return ','.join(str(item_id) for item_id in id_list)
222
223
224 @classmethod
showard989f25d2008-10-01 11:38:11 +0000225 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000226 if not id_list:
227 return {}
showard63a34772008-08-18 19:32:50 +0000228 query %= cls._get_sql_id_list(id_list)
229 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000230 return cls._process_many2many_dict(rows, flip)
231
232
233 @staticmethod
234 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000235 result = {}
236 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000237 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000238 if flip:
239 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000240 result.setdefault(left_id, set()).add(right_id)
241 return result
242
243
244 @classmethod
245 def _get_job_acl_groups(cls, job_ids):
246 query = """
showardd9ac4452009-02-07 02:04:37 +0000247 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000248 FROM jobs
249 INNER JOIN users ON users.login = jobs.owner
250 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
251 WHERE jobs.id IN (%s)
252 """
253 return cls._get_many2many_dict(query, job_ids)
254
255
256 @classmethod
257 def _get_job_ineligible_hosts(cls, job_ids):
258 query = """
259 SELECT job_id, host_id
260 FROM ineligible_host_queues
261 WHERE job_id IN (%s)
262 """
263 return cls._get_many2many_dict(query, job_ids)
264
265
266 @classmethod
showard989f25d2008-10-01 11:38:11 +0000267 def _get_job_dependencies(cls, job_ids):
268 query = """
269 SELECT job_id, label_id
270 FROM jobs_dependency_labels
271 WHERE job_id IN (%s)
272 """
273 return cls._get_many2many_dict(query, job_ids)
274
275
276 @classmethod
showard63a34772008-08-18 19:32:50 +0000277 def _get_host_acls(cls, host_ids):
278 query = """
showardd9ac4452009-02-07 02:04:37 +0000279 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000280 FROM acl_groups_hosts
281 WHERE host_id IN (%s)
282 """
283 return cls._get_many2many_dict(query, host_ids)
284
285
286 @classmethod
287 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000288 if not host_ids:
289 return {}, {}
showard63a34772008-08-18 19:32:50 +0000290 query = """
291 SELECT label_id, host_id
292 FROM hosts_labels
293 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000294 """ % cls._get_sql_id_list(host_ids)
295 rows = _db.execute(query)
296 labels_to_hosts = cls._process_many2many_dict(rows)
297 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
298 return labels_to_hosts, hosts_to_labels
299
300
301 @classmethod
302 def _get_labels(cls):
303 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000304
305
306 def refresh(self, pending_queue_entries):
307 self._hosts_available = self._get_ready_hosts()
308
309 relevant_jobs = [queue_entry.job_id
310 for queue_entry in pending_queue_entries]
311 self._job_acls = self._get_job_acl_groups(relevant_jobs)
312 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000313 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000314
315 host_ids = self._hosts_available.keys()
316 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000317 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
318
319 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000320
321
322 def _is_acl_accessible(self, host_id, queue_entry):
323 job_acls = self._job_acls.get(queue_entry.job_id, set())
324 host_acls = self._host_acls.get(host_id, set())
325 return len(host_acls.intersection(job_acls)) > 0
326
327
showard989f25d2008-10-01 11:38:11 +0000328 def _check_job_dependencies(self, job_dependencies, host_labels):
329 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000330 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000331
332
333 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
334 queue_entry):
showardade14e22009-01-26 22:38:32 +0000335 if not queue_entry.meta_host:
336 # bypass only_if_needed labels when a specific host is selected
337 return True
338
showard989f25d2008-10-01 11:38:11 +0000339 for label_id in host_labels:
340 label = self._labels[label_id]
341 if not label.only_if_needed:
342 # we don't care about non-only_if_needed labels
343 continue
344 if queue_entry.meta_host == label_id:
345 # if the label was requested in a metahost it's OK
346 continue
347 if label_id not in job_dependencies:
348 return False
349 return True
350
351
showard89f84db2009-03-12 20:39:13 +0000352 def _check_atomic_group_labels(self, host_labels, queue_entry):
353 """
354 Determine if the given HostQueueEntry's atomic group settings are okay
355 to schedule on a host with the given labels.
356
357 @param host_labels - A list of label ids that the host has.
358 @param queue_entry - The HostQueueEntry being considered for the host.
359
360 @returns True if atomic group settings are okay, False otherwise.
361 """
362 return (self._get_host_atomic_group_id(host_labels) ==
363 queue_entry.atomic_group_id)
364
365
366 def _get_host_atomic_group_id(self, host_labels):
367 """
368 Return the atomic group label id for a host with the given set of
369 labels if any, or None otherwise. Raises an exception if more than
370 one atomic group are found in the set of labels.
371
372 @param host_labels - A list of label ids that the host has.
373
374 @returns The id of the atomic group found on a label in host_labels
375 or None if no atomic group label is found.
376 @raises SchedulerError - If more than one atomic group label is found.
377 """
378 atomic_ids = [self._labels[label_id].atomic_group_id
379 for label_id in host_labels
380 if self._labels[label_id].atomic_group_id is not None]
381 if not atomic_ids:
382 return None
383 if len(atomic_ids) > 1:
384 raise SchedulerError('More than one atomic label on host.')
385 return atomic_ids[0]
386
387
388 def _get_atomic_group_labels(self, atomic_group_id):
389 """
390 Lookup the label ids that an atomic_group is associated with.
391
392 @param atomic_group_id - The id of the AtomicGroup to look up.
393
394 @returns A generator yeilding Label ids for this atomic group.
395 """
396 return (id for id, label in self._labels.iteritems()
397 if label.atomic_group_id == atomic_group_id
398 and not label.invalid)
399
400
401 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
402 """
403 @param group_hosts - A sequence of Host ids to test for usability
404 and eligibility against the Job associated with queue_entry.
405 @param queue_entry - The HostQueueEntry that these hosts are being
406 tested for eligibility against.
407
408 @returns A subset of group_hosts Host ids that are eligible for the
409 supplied queue_entry.
410 """
411 return set(host_id for host_id in group_hosts
412 if self._is_host_usable(host_id)
413 and self._is_host_eligible_for_job(host_id, queue_entry))
414
415
showard989f25d2008-10-01 11:38:11 +0000416 def _is_host_eligible_for_job(self, host_id, queue_entry):
417 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
418 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000419
showard89f84db2009-03-12 20:39:13 +0000420 return (self._is_acl_accessible(host_id, queue_entry) and
421 self._check_job_dependencies(job_dependencies, host_labels) and
422 self._check_only_if_needed_labels(
423 job_dependencies, host_labels, queue_entry) and
424 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000425
426
showard63a34772008-08-18 19:32:50 +0000427 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000428 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000429 return None
430 return self._hosts_available.pop(queue_entry.host_id, None)
431
432
433 def _is_host_usable(self, host_id):
434 if host_id not in self._hosts_available:
435 # host was already used during this scheduling cycle
436 return False
437 if self._hosts_available[host_id].invalid:
438 # Invalid hosts cannot be used for metahosts. They're included in
439 # the original query because they can be used by non-metahosts.
440 return False
441 return True
442
443
444 def _schedule_metahost(self, queue_entry):
445 label_id = queue_entry.meta_host
446 hosts_in_label = self._label_hosts.get(label_id, set())
447 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
448 set())
449
450 # must iterate over a copy so we can mutate the original while iterating
451 for host_id in list(hosts_in_label):
452 if not self._is_host_usable(host_id):
453 hosts_in_label.remove(host_id)
454 continue
455 if host_id in ineligible_host_ids:
456 continue
showard989f25d2008-10-01 11:38:11 +0000457 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000458 continue
459
showard89f84db2009-03-12 20:39:13 +0000460 # Remove the host from our cached internal state before returning
461 # the host object.
showard63a34772008-08-18 19:32:50 +0000462 hosts_in_label.remove(host_id)
463 return self._hosts_available.pop(host_id)
464 return None
465
466
467 def find_eligible_host(self, queue_entry):
468 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000469 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000470 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000471 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000472 return self._schedule_metahost(queue_entry)
473
474
showard89f84db2009-03-12 20:39:13 +0000475 def find_eligible_atomic_group(self, queue_entry):
476 """
477 Given an atomic group host queue entry, locate an appropriate group
478 of hosts for the associated job to run on.
479
480 The caller is responsible for creating new HQEs for the additional
481 hosts returned in order to run the actual job on them.
482
483 @returns A list of Host instances in a ready state to satisfy this
484 atomic group scheduling. Hosts will all belong to the same
485 atomic group label as specified by the queue_entry.
486 An empty list will be returned if no suitable atomic
487 group could be found.
488
489 TODO(gps): what is responsible for kicking off any attempted repairs on
490 a group of hosts? not this function, but something needs to. We do
491 not communicate that reason for returning [] outside of here...
492 For now, we'll just be unschedulable if enough hosts within one group
493 enter Repair Failed state.
494 """
495 assert queue_entry.atomic_group_id is not None
496 job = queue_entry.job
497 assert job.synch_count and job.synch_count > 0
498 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
499 if job.synch_count > atomic_group.max_number_of_machines:
500 # Such a Job and HostQueueEntry should never be possible to
501 # create using the frontend. Regardless, we can't process it.
502 # Abort it immediately and log an error on the scheduler.
503 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
504 bprint('Error: job %d synch_count=%d > requested atomic_group %d '
505 'max_number_of_machines=%d. Aborted host_queue_entry %d.' %
506 (job.id, job.synch_count, atomic_group.id,
507 atomic_group.max_number_of_machines, queue_entry.id))
508 return []
509 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
510 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
511 set())
512
513 # Look in each label associated with atomic_group until we find one with
514 # enough hosts to satisfy the job.
515 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
516 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
517 if queue_entry.meta_host is not None:
518 # If we have a metahost label, only allow its hosts.
519 group_hosts.intersection_update(hosts_in_label)
520 group_hosts -= ineligible_host_ids
521 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
522 group_hosts, queue_entry)
523
524 # Job.synch_count is treated as "minimum synch count" when
525 # scheduling for an atomic group of hosts. The atomic group
526 # number of machines is the maximum to pick out of a single
527 # atomic group label for scheduling at one time.
528 min_hosts = job.synch_count
529 max_hosts = atomic_group.max_number_of_machines
530
531 if len(eligible_hosts_in_group) < min_hosts:
532 # Not enough eligible hosts in this atomic group label.
533 continue
534
535 # Limit ourselves to scheduling the atomic group size.
536 if len(eligible_hosts_in_group) > max_hosts:
537 eligible_hosts_in_group = random.sample(
538 eligible_hosts_in_group, max_hosts)
539
540 # Remove the selected hosts from our cached internal state
541 # of available hosts in order to return the Host objects.
542 host_list = []
543 for host_id in eligible_hosts_in_group:
544 hosts_in_label.discard(host_id)
545 host_list.append(self._hosts_available.pop(host_id))
546 return host_list
547
548 return []
549
550
showard170873e2009-01-07 00:22:26 +0000551class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000552 def __init__(self):
553 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000554 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000555 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000556 self._host_agents = {}
557 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000558
mbligh36768f02008-02-22 18:28:33 +0000559
jadmanski0afbb632008-06-06 21:10:57 +0000560 def do_initial_recovery(self, recover_hosts=True):
561 # always recover processes
562 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000563
jadmanski0afbb632008-06-06 21:10:57 +0000564 if recover_hosts:
565 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000566
567
jadmanski0afbb632008-06-06 21:10:57 +0000568 def tick(self):
showard170873e2009-01-07 00:22:26 +0000569 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000570 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000571 self._find_aborting()
572 self._schedule_new_jobs()
573 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000574 _drone_manager.execute_actions()
575 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000576
showard97aed502008-11-04 02:01:24 +0000577
showarda3ab0d52008-11-03 19:03:47 +0000578 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000579 should_cleanup = (self._last_clean_time +
580 scheduler_config.config.clean_interval * 60 <
581 time.time())
582 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000583 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000584 self._abort_timed_out_jobs()
585 self._abort_jobs_past_synch_start_timeout()
586 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000587 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000588 self._last_clean_time = time.time()
589
mbligh36768f02008-02-22 18:28:33 +0000590
showard170873e2009-01-07 00:22:26 +0000591 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
592 for object_id in object_ids:
593 agent_dict.setdefault(object_id, set()).add(agent)
594
595
596 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
597 for object_id in object_ids:
598 assert object_id in agent_dict
599 agent_dict[object_id].remove(agent)
600
601
jadmanski0afbb632008-06-06 21:10:57 +0000602 def add_agent(self, agent):
603 self._agents.append(agent)
604 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000605 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
606 self._register_agent_for_ids(self._queue_entry_agents,
607 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000608
showard170873e2009-01-07 00:22:26 +0000609
610 def get_agents_for_entry(self, queue_entry):
611 """
612 Find agents corresponding to the specified queue_entry.
613 """
614 return self._queue_entry_agents.get(queue_entry.id, set())
615
616
617 def host_has_agent(self, host):
618 """
619 Determine if there is currently an Agent present using this host.
620 """
621 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000622
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 def remove_agent(self, agent):
625 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000626 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
627 agent)
628 self._unregister_agent_for_ids(self._queue_entry_agents,
629 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000630
631
showard4c5374f2008-09-04 17:02:56 +0000632 def num_running_processes(self):
633 return sum(agent.num_processes for agent in self._agents
634 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000635
636
showard170873e2009-01-07 00:22:26 +0000637 def _extract_execution_tag(self, command_line):
638 match = re.match(r'.* -P (\S+) ', command_line)
639 if not match:
640 return None
641 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000642
643
showard2bab8f42008-11-12 18:15:22 +0000644 def _recover_queue_entries(self, queue_entries, run_monitor):
645 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000646 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
647 queue_entries=queue_entries,
648 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000649 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000650 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000651
652
jadmanski0afbb632008-06-06 21:10:57 +0000653 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000654 self._register_pidfiles()
655 _drone_manager.refresh()
656 self._recover_running_entries()
657 self._recover_aborting_entries()
658 self._requeue_other_active_entries()
659 self._recover_parsing_entries()
660 self._reverify_remaining_hosts()
661 # reinitialize drones after killing orphaned processes, since they can
662 # leave around files when they die
663 _drone_manager.execute_actions()
664 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def _register_pidfiles(self):
668 # during recovery we may need to read pidfiles for both running and
669 # parsing entries
670 queue_entries = HostQueueEntry.fetch(
671 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000672 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000673 pidfile_id = _drone_manager.get_pidfile_id_from(
674 queue_entry.execution_tag())
675 _drone_manager.register_pidfile(pidfile_id)
676
677
678 def _recover_running_entries(self):
679 orphans = _drone_manager.get_orphaned_autoserv_processes()
680
681 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
682 requeue_entries = []
683 for queue_entry in queue_entries:
684 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000685 # synchronous job we've already recovered
686 continue
showard170873e2009-01-07 00:22:26 +0000687 execution_tag = queue_entry.execution_tag()
688 run_monitor = PidfileRunMonitor()
689 run_monitor.attach_to_existing_process(execution_tag)
690 if not run_monitor.has_process():
691 # autoserv apparently never got run, so let it get requeued
692 continue
showarde788ea62008-11-17 21:02:47 +0000693 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000694 logging.info('Recovering %s (process %s)',
695 (', '.join(str(entry) for entry in queue_entries),
696 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000697 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000698 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000699
jadmanski0afbb632008-06-06 21:10:57 +0000700 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000701 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000702 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000703 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000704
showard170873e2009-01-07 00:22:26 +0000705
706 def _recover_aborting_entries(self):
707 queue_entries = HostQueueEntry.fetch(
708 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000709 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000710 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000711 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000712
showard97aed502008-11-04 02:01:24 +0000713
showard170873e2009-01-07 00:22:26 +0000714 def _requeue_other_active_entries(self):
715 queue_entries = HostQueueEntry.fetch(
716 where='active AND NOT complete AND status != "Pending"')
717 for queue_entry in queue_entries:
718 if self.get_agents_for_entry(queue_entry):
719 # entry has already been recovered
720 continue
showardb18134f2009-03-20 20:52:18 +0000721 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
722 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000723 if queue_entry.host:
724 tasks = queue_entry.host.reverify_tasks()
725 self.add_agent(Agent(tasks))
726 agent = queue_entry.requeue()
727
728
729 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000730 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000731 self._reverify_hosts_where("""(status = 'Repairing' OR
732 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000733 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000734
showard170873e2009-01-07 00:22:26 +0000735 # recover "Running" hosts with no active queue entries, although this
736 # should never happen
737 message = ('Recovering running host %s - this probably indicates a '
738 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000739 self._reverify_hosts_where("""status = 'Running' AND
740 id NOT IN (SELECT host_id
741 FROM host_queue_entries
742 WHERE active)""",
743 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000744
745
jadmanski0afbb632008-06-06 21:10:57 +0000746 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000747 print_message='Reverifying host %s'):
748 full_where='locked = 0 AND invalid = 0 AND ' + where
749 for host in Host.fetch(where=full_where):
750 if self.host_has_agent(host):
751 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000752 continue
showard170873e2009-01-07 00:22:26 +0000753 if print_message:
showardb18134f2009-03-20 20:52:18 +0000754 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000755 tasks = host.reverify_tasks()
756 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000757
758
showard97aed502008-11-04 02:01:24 +0000759 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000760 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000761 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000762 if entry.id in recovered_entry_ids:
763 continue
764 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000765 recovered_entry_ids = recovered_entry_ids.union(
766 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000767 logging.info('Recovering parsing entries %s',
768 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000769
770 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000771 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000772
773
jadmanski0afbb632008-06-06 21:10:57 +0000774 def _recover_hosts(self):
775 # recover "Repair Failed" hosts
776 message = 'Reverifying dead host %s'
777 self._reverify_hosts_where("status = 'Repair Failed'",
778 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000779
780
showard3bb499f2008-07-03 19:42:20 +0000781 def _abort_timed_out_jobs(self):
782 """
783 Aborts all jobs that have timed out and not completed
784 """
showarda3ab0d52008-11-03 19:03:47 +0000785 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
786 where=['created_on + INTERVAL timeout HOUR < NOW()'])
787 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000788 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000789 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000790
791
showard98863972008-10-29 21:14:56 +0000792 def _abort_jobs_past_synch_start_timeout(self):
793 """
794 Abort synchronous jobs that are past the start timeout (from global
795 config) and are holding a machine that's in everyone.
796 """
797 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000798 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000799 timeout_start = datetime.datetime.now() - timeout_delta
800 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000801 created_on__lt=timeout_start,
802 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000803 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000804 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000805 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000806 entries_to_abort = job.hostqueueentry_set.exclude(
807 status=models.HostQueueEntry.Status.RUNNING)
808 for queue_entry in entries_to_abort:
809 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _clear_inactive_blocks(self):
813 """
814 Clear out blocks for all completed jobs.
815 """
816 # this would be simpler using NOT IN (subquery), but MySQL
817 # treats all IN subqueries as dependent, so this optimizes much
818 # better
819 _db.execute("""
820 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000821 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000822 WHERE NOT complete) hqe
823 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000824
825
showardb95b1bd2008-08-15 18:11:04 +0000826 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000827 # prioritize by job priority, then non-metahost over metahost, then FIFO
828 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000829 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000830 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000831 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000832
833
showard89f84db2009-03-12 20:39:13 +0000834 def _refresh_pending_queue_entries(self):
835 """
836 Lookup the pending HostQueueEntries and call our HostScheduler
837 refresh() method given that list. Return the list.
838
839 @returns A list of pending HostQueueEntries sorted in priority order.
840 """
showard63a34772008-08-18 19:32:50 +0000841 queue_entries = self._get_pending_queue_entries()
842 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000843 return []
showardb95b1bd2008-08-15 18:11:04 +0000844
showard63a34772008-08-18 19:32:50 +0000845 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000846
showard89f84db2009-03-12 20:39:13 +0000847 return queue_entries
848
849
850 def _schedule_atomic_group(self, queue_entry):
851 """
852 Schedule the given queue_entry on an atomic group of hosts.
853
854 Returns immediately if there are insufficient available hosts.
855
856 Creates new HostQueueEntries based off of queue_entry for the
857 scheduled hosts and starts them all running.
858 """
859 # This is a virtual host queue entry representing an entire
860 # atomic group, find a group and schedule their hosts.
861 group_hosts = self._host_scheduler.find_eligible_atomic_group(
862 queue_entry)
863 if not group_hosts:
864 return
865 # The first assigned host uses the original HostQueueEntry
866 group_queue_entries = [queue_entry]
867 for assigned_host in group_hosts[1:]:
868 # Create a new HQE for every additional assigned_host.
869 new_hqe = HostQueueEntry.clone(queue_entry)
870 new_hqe.save()
871 group_queue_entries.append(new_hqe)
872 assert len(group_queue_entries) == len(group_hosts)
873 for queue_entry, host in itertools.izip(group_queue_entries,
874 group_hosts):
875 self._run_queue_entry(queue_entry, host)
876
877
878 def _schedule_new_jobs(self):
879 queue_entries = self._refresh_pending_queue_entries()
880 if not queue_entries:
881 return
882
showard63a34772008-08-18 19:32:50 +0000883 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000884 if (queue_entry.atomic_group_id is None or
885 queue_entry.host_id is not None):
886 assigned_host = self._host_scheduler.find_eligible_host(
887 queue_entry)
888 if assigned_host:
889 self._run_queue_entry(queue_entry, assigned_host)
890 else:
891 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000892
893
894 def _run_queue_entry(self, queue_entry, host):
895 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000896 # in some cases (synchronous jobs with run_verify=False), agent may be
897 # None
showard9976ce92008-10-15 20:28:13 +0000898 if agent:
899 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000900
901
jadmanski0afbb632008-06-06 21:10:57 +0000902 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000903 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000904 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000905 for agent in agents_to_abort:
906 self.remove_agent(agent)
907
showard170873e2009-01-07 00:22:26 +0000908 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000909
910
showard324bf812009-01-20 23:23:38 +0000911 def _can_start_agent(self, agent, num_started_this_cycle,
912 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000913 # always allow zero-process agents to run
914 if agent.num_processes == 0:
915 return True
916 # don't allow any nonzero-process agents to run after we've reached a
917 # limit (this avoids starvation of many-process agents)
918 if have_reached_limit:
919 return False
920 # total process throttling
showard324bf812009-01-20 23:23:38 +0000921 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000922 return False
923 # if a single agent exceeds the per-cycle throttling, still allow it to
924 # run when it's the first agent in the cycle
925 if num_started_this_cycle == 0:
926 return True
927 # per-cycle throttling
928 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000929 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000930 return False
931 return True
932
933
jadmanski0afbb632008-06-06 21:10:57 +0000934 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000935 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000936 have_reached_limit = False
937 # iterate over copy, so we can remove agents during iteration
938 for agent in list(self._agents):
939 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000940 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000941 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000942 continue
943 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000944 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000945 have_reached_limit):
946 have_reached_limit = True
947 continue
showard4c5374f2008-09-04 17:02:56 +0000948 num_started_this_cycle += agent.num_processes
949 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000950 logging.info('%d running processes',
951 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000952
953
showardfa8629c2008-11-04 16:51:23 +0000954 def _check_for_db_inconsistencies(self):
955 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
956 if query.count() != 0:
957 subject = ('%d queue entries found with active=complete=1'
958 % query.count())
959 message = '\n'.join(str(entry.get_object_dict())
960 for entry in query[:50])
961 if len(query) > 50:
962 message += '\n(truncated)\n'
963
showardb18134f2009-03-20 20:52:18 +0000964 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000965 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000966
967
showard170873e2009-01-07 00:22:26 +0000968class PidfileRunMonitor(object):
969 """
970 Client must call either run() to start a new process or
971 attach_to_existing_process().
972 """
mbligh36768f02008-02-22 18:28:33 +0000973
showard170873e2009-01-07 00:22:26 +0000974 class _PidfileException(Exception):
975 """
976 Raised when there's some unexpected behavior with the pid file, but only
977 used internally (never allowed to escape this class).
978 """
mbligh36768f02008-02-22 18:28:33 +0000979
980
showard170873e2009-01-07 00:22:26 +0000981 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000982 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000983 self._start_time = None
984 self.pidfile_id = None
985 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000986
987
showard170873e2009-01-07 00:22:26 +0000988 def _add_nice_command(self, command, nice_level):
989 if not nice_level:
990 return command
991 return ['nice', '-n', str(nice_level)] + command
992
993
994 def _set_start_time(self):
995 self._start_time = time.time()
996
997
998 def run(self, command, working_directory, nice_level=None, log_file=None,
999 pidfile_name=None, paired_with_pidfile=None):
1000 assert command is not None
1001 if nice_level is not None:
1002 command = ['nice', '-n', str(nice_level)] + command
1003 self._set_start_time()
1004 self.pidfile_id = _drone_manager.execute_command(
1005 command, working_directory, log_file=log_file,
1006 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1007
1008
1009 def attach_to_existing_process(self, execution_tag):
1010 self._set_start_time()
1011 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1012 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001013
1014
jadmanski0afbb632008-06-06 21:10:57 +00001015 def kill(self):
showard170873e2009-01-07 00:22:26 +00001016 if self.has_process():
1017 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001018
mbligh36768f02008-02-22 18:28:33 +00001019
showard170873e2009-01-07 00:22:26 +00001020 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001021 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001022 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001023
1024
showard170873e2009-01-07 00:22:26 +00001025 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001026 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001027 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001028 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001029
1030
showard170873e2009-01-07 00:22:26 +00001031 def _read_pidfile(self, use_second_read=False):
1032 assert self.pidfile_id is not None, (
1033 'You must call run() or attach_to_existing_process()')
1034 contents = _drone_manager.get_pidfile_contents(
1035 self.pidfile_id, use_second_read=use_second_read)
1036 if contents.is_invalid():
1037 self._state = drone_manager.PidfileContents()
1038 raise self._PidfileException(contents)
1039 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001040
1041
showard21baa452008-10-21 00:08:39 +00001042 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001043 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1044 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001045 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001046 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001047 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001048
1049
1050 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001051 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001052 return
mblighbb421852008-03-11 22:36:16 +00001053
showard21baa452008-10-21 00:08:39 +00001054 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001055
showard170873e2009-01-07 00:22:26 +00001056 if self._state.process is None:
1057 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001058 return
mbligh90a549d2008-03-25 23:52:34 +00001059
showard21baa452008-10-21 00:08:39 +00001060 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001061 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001062 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001063 return
mbligh90a549d2008-03-25 23:52:34 +00001064
showard170873e2009-01-07 00:22:26 +00001065 # pid but no running process - maybe process *just* exited
1066 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001067 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001068 # autoserv exited without writing an exit code
1069 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001070 self._handle_pidfile_error(
1071 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001072
showard21baa452008-10-21 00:08:39 +00001073
1074 def _get_pidfile_info(self):
1075 """\
1076 After completion, self._state will contain:
1077 pid=None, exit_status=None if autoserv has not yet run
1078 pid!=None, exit_status=None if autoserv is running
1079 pid!=None, exit_status!=None if autoserv has completed
1080 """
1081 try:
1082 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001083 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001084 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001085
1086
showard170873e2009-01-07 00:22:26 +00001087 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001088 """\
1089 Called when no pidfile is found or no pid is in the pidfile.
1090 """
showard170873e2009-01-07 00:22:26 +00001091 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001092 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001093 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1094 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001095 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001096 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001097
1098
showard35162b02009-03-03 02:17:30 +00001099 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001100 """\
1101 Called when autoserv has exited without writing an exit status,
1102 or we've timed out waiting for autoserv to write a pid to the
1103 pidfile. In either case, we just return failure and the caller
1104 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001105
showard170873e2009-01-07 00:22:26 +00001106 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001107 """
1108 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001109 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001110 self._state.exit_status = 1
1111 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001112
1113
jadmanski0afbb632008-06-06 21:10:57 +00001114 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001115 self._get_pidfile_info()
1116 return self._state.exit_status
1117
1118
1119 def num_tests_failed(self):
1120 self._get_pidfile_info()
1121 assert self._state.num_tests_failed is not None
1122 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001123
1124
mbligh36768f02008-02-22 18:28:33 +00001125class Agent(object):
showard170873e2009-01-07 00:22:26 +00001126 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001127 self.active_task = None
1128 self.queue = Queue.Queue(0)
1129 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001130 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001131
showard170873e2009-01-07 00:22:26 +00001132 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1133 for task in tasks)
1134 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1135
jadmanski0afbb632008-06-06 21:10:57 +00001136 for task in tasks:
1137 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _union_ids(self, id_lists):
1141 return set(itertools.chain(*id_lists))
1142
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def add_task(self, task):
1145 self.queue.put_nowait(task)
1146 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001147
1148
jadmanski0afbb632008-06-06 21:10:57 +00001149 def tick(self):
showard21baa452008-10-21 00:08:39 +00001150 while not self.is_done():
1151 if self.active_task and not self.active_task.is_done():
1152 self.active_task.poll()
1153 if not self.active_task.is_done():
1154 return
1155 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001159 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001160 if self.active_task:
1161 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001162
jadmanski0afbb632008-06-06 21:10:57 +00001163 if not self.active_task.success:
1164 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001165
jadmanski0afbb632008-06-06 21:10:57 +00001166 self.active_task = None
1167 if not self.is_done():
1168 self.active_task = self.queue.get_nowait()
1169 if self.active_task:
1170 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def on_task_failure(self):
1174 self.queue = Queue.Queue(0)
showardccbd6c52009-03-21 00:10:21 +00001175 # run failure tasks in a new Agent, so host_ids and queue_entry_ids will
1176 # get reset.
1177 new_agent = Agent(self.active_task.failure_tasks)
1178 self.dispatcher.add_agent(new_agent)
mbligh16c722d2008-03-05 00:58:44 +00001179
mblighe2586682008-02-29 22:45:46 +00001180
showard4c5374f2008-09-04 17:02:56 +00001181 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001182 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001183
1184
jadmanski0afbb632008-06-06 21:10:57 +00001185 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001186 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001187
1188
jadmanski0afbb632008-06-06 21:10:57 +00001189 def start(self):
1190 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001191
jadmanski0afbb632008-06-06 21:10:57 +00001192 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001193
jadmanski0afbb632008-06-06 21:10:57 +00001194
mbligh36768f02008-02-22 18:28:33 +00001195class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001196 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001197 self.done = False
1198 self.failure_tasks = failure_tasks
1199 self.started = False
1200 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001201 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001202 self.task = None
1203 self.agent = None
1204 self.monitor = None
1205 self.success = None
showard170873e2009-01-07 00:22:26 +00001206 self.queue_entry_ids = []
1207 self.host_ids = []
1208 self.log_file = None
1209
1210
1211 def _set_ids(self, host=None, queue_entries=None):
1212 if queue_entries and queue_entries != [None]:
1213 self.host_ids = [entry.host.id for entry in queue_entries]
1214 self.queue_entry_ids = [entry.id for entry in queue_entries]
1215 else:
1216 assert host
1217 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001218
1219
jadmanski0afbb632008-06-06 21:10:57 +00001220 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001221 if self.monitor:
1222 self.tick(self.monitor.exit_code())
1223 else:
1224 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001225
1226
jadmanski0afbb632008-06-06 21:10:57 +00001227 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001228 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001229 return
jadmanski0afbb632008-06-06 21:10:57 +00001230 if exit_code == 0:
1231 success = True
1232 else:
1233 success = False
mbligh36768f02008-02-22 18:28:33 +00001234
jadmanski0afbb632008-06-06 21:10:57 +00001235 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001236
1237
jadmanski0afbb632008-06-06 21:10:57 +00001238 def is_done(self):
1239 return self.done
mbligh36768f02008-02-22 18:28:33 +00001240
1241
jadmanski0afbb632008-06-06 21:10:57 +00001242 def finished(self, success):
1243 self.done = True
1244 self.success = success
1245 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001246
1247
jadmanski0afbb632008-06-06 21:10:57 +00001248 def prolog(self):
1249 pass
mblighd64e5702008-04-04 21:39:28 +00001250
1251
jadmanski0afbb632008-06-06 21:10:57 +00001252 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001253 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001254
mbligh36768f02008-02-22 18:28:33 +00001255
jadmanski0afbb632008-06-06 21:10:57 +00001256 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001257 if self.monitor and self.log_file:
1258 _drone_manager.copy_to_results_repository(
1259 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001260
1261
jadmanski0afbb632008-06-06 21:10:57 +00001262 def epilog(self):
1263 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001264
1265
jadmanski0afbb632008-06-06 21:10:57 +00001266 def start(self):
1267 assert self.agent
1268
1269 if not self.started:
1270 self.prolog()
1271 self.run()
1272
1273 self.started = True
1274
1275
1276 def abort(self):
1277 if self.monitor:
1278 self.monitor.kill()
1279 self.done = True
1280 self.cleanup()
1281
1282
showard170873e2009-01-07 00:22:26 +00001283 def set_host_log_file(self, base_name, host):
1284 filename = '%s.%s' % (time.time(), base_name)
1285 self.log_file = os.path.join('hosts', host.hostname, filename)
1286
1287
showardde634ee2009-01-30 01:44:24 +00001288 def _get_consistent_execution_tag(self, queue_entries):
1289 first_execution_tag = queue_entries[0].execution_tag()
1290 for queue_entry in queue_entries[1:]:
1291 assert queue_entry.execution_tag() == first_execution_tag, (
1292 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1293 queue_entry,
1294 first_execution_tag,
1295 queue_entries[0]))
1296 return first_execution_tag
1297
1298
showard678df4f2009-02-04 21:36:39 +00001299 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001300 assert len(queue_entries) > 0
1301 assert self.monitor
1302 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001303 results_path = execution_tag + '/'
1304 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1305 results_path)
showardde634ee2009-01-30 01:44:24 +00001306
1307 reparse_task = FinalReparseTask(queue_entries)
1308 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1309
1310
jadmanski0afbb632008-06-06 21:10:57 +00001311 def run(self):
1312 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001313 self.monitor = PidfileRunMonitor()
1314 self.monitor.run(self.cmd, self._working_directory,
1315 nice_level=AUTOSERV_NICE_LEVEL,
1316 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001317
1318
1319class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001320 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001321 """\
showard170873e2009-01-07 00:22:26 +00001322 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001323 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001324 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001325 # normalize the protection name
1326 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001327
jadmanski0afbb632008-06-06 21:10:57 +00001328 self.host = host
showardccbd6c52009-03-21 00:10:21 +00001329 self.queue_entry_to_fail = queue_entry
1330 # *don't* include the queue entry in IDs -- if the queue entry is
1331 # aborted, we want to leave the repair task running
1332 self._set_ids(host=host)
showard170873e2009-01-07 00:22:26 +00001333
1334 self.create_temp_resultsdir('.repair')
1335 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1336 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1337 '--host-protection', protection]
1338 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1339
showard170873e2009-01-07 00:22:26 +00001340 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001341
mbligh36768f02008-02-22 18:28:33 +00001342
jadmanski0afbb632008-06-06 21:10:57 +00001343 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001344 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001345 self.host.set_status('Repairing')
showardccbd6c52009-03-21 00:10:21 +00001346 if self.queue_entry_to_fail:
1347 self.queue_entry_to_fail.requeue()
mbligh36768f02008-02-22 18:28:33 +00001348
1349
showardde634ee2009-01-30 01:44:24 +00001350 def _fail_queue_entry(self):
showardccbd6c52009-03-21 00:10:21 +00001351 assert self.queue_entry_to_fail
1352
1353 if self.queue_entry_to_fail.meta_host:
1354 return # don't fail metahost entries, they'll be reassigned
1355
1356 self.queue_entry_to_fail.update_from_database()
1357 if self.queue_entry_to_fail.status != 'Queued':
1358 return # entry has been aborted
1359
1360 self.queue_entry_to_fail.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001361 # copy results logs into the normal place for job results
1362 _drone_manager.copy_results_on_drone(
1363 self.monitor.get_process(),
1364 source_path=self.temp_results_dir + '/',
showardccbd6c52009-03-21 00:10:21 +00001365 destination_path=self.queue_entry_to_fail.execution_tag() + '/')
showard678df4f2009-02-04 21:36:39 +00001366
showardccbd6c52009-03-21 00:10:21 +00001367 self._copy_and_parse_results([self.queue_entry_to_fail])
1368 self.queue_entry_to_fail.handle_host_failure()
showardde634ee2009-01-30 01:44:24 +00001369
1370
jadmanski0afbb632008-06-06 21:10:57 +00001371 def epilog(self):
1372 super(RepairTask, self).epilog()
1373 if self.success:
1374 self.host.set_status('Ready')
1375 else:
1376 self.host.set_status('Repair Failed')
showardccbd6c52009-03-21 00:10:21 +00001377 if self.queue_entry_to_fail:
showardde634ee2009-01-30 01:44:24 +00001378 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001379
1380
showard8fe93b52008-11-18 17:53:22 +00001381class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001382 def epilog(self):
1383 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001384 should_copy_results = (self.queue_entry and not self.success
1385 and not self.queue_entry.meta_host)
1386 if should_copy_results:
1387 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001388 destination = os.path.join(self.queue_entry.execution_tag(),
1389 os.path.basename(self.log_file))
1390 _drone_manager.copy_to_results_repository(
1391 self.monitor.get_process(), self.log_file,
1392 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001393
1394
1395class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001396 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001397 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001398 self.host = host or queue_entry.host
1399 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001400
jadmanski0afbb632008-06-06 21:10:57 +00001401 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001402 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1403 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001404 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001405 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1406 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001407
showard170873e2009-01-07 00:22:26 +00001408 self.set_host_log_file('verify', self.host)
1409 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001410
1411
jadmanski0afbb632008-06-06 21:10:57 +00001412 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001413 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001414 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001415 if self.queue_entry:
1416 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001417 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001418
1419
jadmanski0afbb632008-06-06 21:10:57 +00001420 def epilog(self):
1421 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001422
jadmanski0afbb632008-06-06 21:10:57 +00001423 if self.success:
1424 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001425
1426
mbligh36768f02008-02-22 18:28:33 +00001427class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001428 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001429 self.job = job
1430 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001431 super(QueueTask, self).__init__(cmd, self._execution_tag())
1432 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001433
1434
showard170873e2009-01-07 00:22:26 +00001435 def _format_keyval(self, key, value):
1436 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001437
1438
showard73ec0442009-02-07 02:05:20 +00001439 def _keyval_path(self):
1440 return os.path.join(self._execution_tag(), 'keyval')
1441
1442
1443 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1444 keyval_contents = '\n'.join(self._format_keyval(key, value)
1445 for key, value in keyval_dict.iteritems())
1446 # always end with a newline to allow additional keyvals to be written
1447 keyval_contents += '\n'
1448 _drone_manager.attach_file_to_execution(self._execution_tag(),
1449 keyval_contents,
1450 file_path=keyval_path)
1451
1452
1453 def _write_keyvals_before_job(self, keyval_dict):
1454 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1455
1456
1457 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001458 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001459 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001460 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001461 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001462
1463
showard170873e2009-01-07 00:22:26 +00001464 def _write_host_keyvals(self, host):
1465 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1466 host.hostname)
1467 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001468 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1469 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001470
1471
showard170873e2009-01-07 00:22:26 +00001472 def _execution_tag(self):
1473 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001474
1475
jadmanski0afbb632008-06-06 21:10:57 +00001476 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001477 queued = int(time.mktime(self.job.created_on.timetuple()))
1478 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001479 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001480 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001481 queue_entry.set_status('Running')
1482 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001483 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001484 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001485 assert len(self.queue_entries) == 1
1486 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001487
1488
showard35162b02009-03-03 02:17:30 +00001489 def _write_lost_process_error_file(self):
1490 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1491 _drone_manager.write_lines_to_file(error_file_path,
1492 [_LOST_PROCESS_ERROR])
1493
1494
showard97aed502008-11-04 02:01:24 +00001495 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001496 if self.monitor.has_process():
1497 self._write_keyval_after_job("job_finished", int(time.time()))
1498 self._copy_and_parse_results(self.queue_entries)
1499
1500 if self.monitor.lost_process:
1501 self._write_lost_process_error_file()
1502 for queue_entry in self.queue_entries:
1503 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001504
1505
showardcbd74612008-11-19 21:42:02 +00001506 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001507 _drone_manager.write_lines_to_file(
1508 os.path.join(self._execution_tag(), 'status.log'),
1509 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001510 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001511
1512
jadmanskif7fa2cc2008-10-01 14:13:23 +00001513 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001514 if not self.monitor or not self.monitor.has_process():
1515 return
1516
jadmanskif7fa2cc2008-10-01 14:13:23 +00001517 # build up sets of all the aborted_by and aborted_on values
1518 aborted_by, aborted_on = set(), set()
1519 for queue_entry in self.queue_entries:
1520 if queue_entry.aborted_by:
1521 aborted_by.add(queue_entry.aborted_by)
1522 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1523 aborted_on.add(t)
1524
1525 # extract some actual, unique aborted by value and write it out
1526 assert len(aborted_by) <= 1
1527 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001528 aborted_by_value = aborted_by.pop()
1529 aborted_on_value = max(aborted_on)
1530 else:
1531 aborted_by_value = 'autotest_system'
1532 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001533
showarda0382352009-02-11 23:36:43 +00001534 self._write_keyval_after_job("aborted_by", aborted_by_value)
1535 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001536
showardcbd74612008-11-19 21:42:02 +00001537 aborted_on_string = str(datetime.datetime.fromtimestamp(
1538 aborted_on_value))
1539 self._write_status_comment('Job aborted by %s on %s' %
1540 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001541
1542
jadmanski0afbb632008-06-06 21:10:57 +00001543 def abort(self):
1544 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001545 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001546 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001547
1548
showard21baa452008-10-21 00:08:39 +00001549 def _reboot_hosts(self):
1550 reboot_after = self.job.reboot_after
1551 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001552 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001553 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001554 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001555 num_tests_failed = self.monitor.num_tests_failed()
1556 do_reboot = (self.success and num_tests_failed == 0)
1557
showard8ebca792008-11-04 21:54:22 +00001558 for queue_entry in self.queue_entries:
1559 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001560 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001561 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001562 cleanup_task = CleanupTask(host=queue_entry.get_host())
1563 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001564 else:
1565 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001566
1567
jadmanski0afbb632008-06-06 21:10:57 +00001568 def epilog(self):
1569 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001570 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001571 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001572
showardb18134f2009-03-20 20:52:18 +00001573 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001574
1575
mblighbb421852008-03-11 22:36:16 +00001576class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001577 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001578 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001579 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001580
1581
jadmanski0afbb632008-06-06 21:10:57 +00001582 def run(self):
1583 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001584
1585
jadmanski0afbb632008-06-06 21:10:57 +00001586 def prolog(self):
1587 # recovering an existing process - don't do prolog
1588 pass
mblighbb421852008-03-11 22:36:16 +00001589
1590
showard8fe93b52008-11-18 17:53:22 +00001591class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001592 def __init__(self, host=None, queue_entry=None):
1593 assert bool(host) ^ bool(queue_entry)
1594 if queue_entry:
1595 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001596 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001597 self.host = host
showard170873e2009-01-07 00:22:26 +00001598
1599 self.create_temp_resultsdir('.cleanup')
1600 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1601 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001602 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001603 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1604 failure_tasks=[repair_task])
1605
1606 self._set_ids(host=host, queue_entries=[queue_entry])
1607 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001608
mblighd5c95802008-03-05 00:33:46 +00001609
jadmanski0afbb632008-06-06 21:10:57 +00001610 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001611 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001612 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001613 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001614
mblighd5c95802008-03-05 00:33:46 +00001615
showard21baa452008-10-21 00:08:39 +00001616 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001617 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001618 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001619 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001620 self.host.update_field('dirty', 0)
1621
1622
mblighd5c95802008-03-05 00:33:46 +00001623class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001624 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001625 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001626 self.queue_entry = queue_entry
1627 # don't use _set_ids, since we don't want to set the host_ids
1628 self.queue_entry_ids = [queue_entry.id]
1629 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001630
1631
jadmanski0afbb632008-06-06 21:10:57 +00001632 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001633 logging.info("starting abort on host %s, job %s",
1634 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001635
mblighd64e5702008-04-04 21:39:28 +00001636
jadmanski0afbb632008-06-06 21:10:57 +00001637 def epilog(self):
1638 super(AbortTask, self).epilog()
1639 self.queue_entry.set_status('Aborted')
1640 self.success = True
1641
1642
1643 def run(self):
1644 for agent in self.agents_to_abort:
1645 if (agent.active_task):
1646 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001647
1648
showard97aed502008-11-04 02:01:24 +00001649class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001650 _num_running_parses = 0
1651
1652 def __init__(self, queue_entries):
1653 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001654 # don't use _set_ids, since we don't want to set the host_ids
1655 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001656 self._parse_started = False
1657
1658 assert len(queue_entries) > 0
1659 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001660
showard170873e2009-01-07 00:22:26 +00001661 self._execution_tag = queue_entry.execution_tag()
1662 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1663 self._autoserv_monitor = PidfileRunMonitor()
1664 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1665 self._final_status = self._determine_final_status()
1666
showard97aed502008-11-04 02:01:24 +00001667 if _testing_mode:
1668 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001669 else:
1670 super(FinalReparseTask, self).__init__(
1671 cmd=self._generate_parse_command(),
1672 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001673
showard170873e2009-01-07 00:22:26 +00001674 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001675
1676
1677 @classmethod
1678 def _increment_running_parses(cls):
1679 cls._num_running_parses += 1
1680
1681
1682 @classmethod
1683 def _decrement_running_parses(cls):
1684 cls._num_running_parses -= 1
1685
1686
1687 @classmethod
1688 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001689 return (cls._num_running_parses <
1690 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001691
1692
showard170873e2009-01-07 00:22:26 +00001693 def _determine_final_status(self):
1694 # we'll use a PidfileRunMonitor to read the autoserv exit status
1695 if self._autoserv_monitor.exit_code() == 0:
1696 return models.HostQueueEntry.Status.COMPLETED
1697 return models.HostQueueEntry.Status.FAILED
1698
1699
showard97aed502008-11-04 02:01:24 +00001700 def prolog(self):
1701 super(FinalReparseTask, self).prolog()
1702 for queue_entry in self._queue_entries:
1703 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1704
1705
1706 def epilog(self):
1707 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001708 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001709 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001710
1711
showard2bab8f42008-11-12 18:15:22 +00001712 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001713 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1714 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001715
1716
1717 def poll(self):
1718 # override poll to keep trying to start until the parse count goes down
1719 # and we can, at which point we revert to default behavior
1720 if self._parse_started:
1721 super(FinalReparseTask, self).poll()
1722 else:
1723 self._try_starting_parse()
1724
1725
1726 def run(self):
1727 # override run() to not actually run unless we can
1728 self._try_starting_parse()
1729
1730
1731 def _try_starting_parse(self):
1732 if not self._can_run_new_parse():
1733 return
showard170873e2009-01-07 00:22:26 +00001734
showard678df4f2009-02-04 21:36:39 +00001735 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001736 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001737 if not self._autoserv_monitor.has_process():
1738 email_manager.manager.enqueue_notify_email(
1739 'No results to parse',
1740 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1741 self.finished(False)
1742 return
1743
showard97aed502008-11-04 02:01:24 +00001744 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001745 self.monitor = PidfileRunMonitor()
1746 self.monitor.run(self.cmd, self._working_directory,
1747 log_file=self.log_file,
1748 pidfile_name='.parser_execute',
1749 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1750
showard97aed502008-11-04 02:01:24 +00001751 self._increment_running_parses()
1752 self._parse_started = True
1753
1754
1755 def finished(self, success):
1756 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001757 if self._parse_started:
1758 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001759
1760
showardc9ae1782009-01-30 01:42:37 +00001761class SetEntryPendingTask(AgentTask):
1762 def __init__(self, queue_entry):
1763 super(SetEntryPendingTask, self).__init__(cmd='')
1764 self._queue_entry = queue_entry
1765 self._set_ids(queue_entries=[queue_entry])
1766
1767
1768 def run(self):
1769 agent = self._queue_entry.on_pending()
1770 if agent:
1771 self.agent.dispatcher.add_agent(agent)
1772 self.finished(True)
1773
1774
showarda3c58572009-03-12 20:36:59 +00001775class DBError(Exception):
1776 """Raised by the DBObject constructor when its select fails."""
1777
1778
mbligh36768f02008-02-22 18:28:33 +00001779class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001780 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001781
1782 # Subclasses MUST override these:
1783 _table_name = ''
1784 _fields = ()
1785
showarda3c58572009-03-12 20:36:59 +00001786 # A mapping from (type, id) to the instance of the object for that
1787 # particular id. This prevents us from creating new Job() and Host()
1788 # instances for every HostQueueEntry object that we instantiate as
1789 # multiple HQEs often share the same Job.
1790 _instances_by_type_and_id = weakref.WeakValueDictionary()
1791 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001792
showarda3c58572009-03-12 20:36:59 +00001793
1794 def __new__(cls, id=None, **kwargs):
1795 """
1796 Look to see if we already have an instance for this particular type
1797 and id. If so, use it instead of creating a duplicate instance.
1798 """
1799 if id is not None:
1800 instance = cls._instances_by_type_and_id.get((cls, id))
1801 if instance:
1802 return instance
1803 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1804
1805
1806 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001807 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001808 assert self._table_name, '_table_name must be defined in your class'
1809 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001810 if not new_record:
1811 if self._initialized and not always_query:
1812 return # We've already been initialized.
1813 if id is None:
1814 id = row[0]
1815 # Tell future constructors to use us instead of re-querying while
1816 # this instance is still around.
1817 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001818
showard6ae5ea92009-02-25 00:11:51 +00001819 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001820
jadmanski0afbb632008-06-06 21:10:57 +00001821 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001822
jadmanski0afbb632008-06-06 21:10:57 +00001823 if row is None:
showardccbd6c52009-03-21 00:10:21 +00001824 row = self._fetch_row_from_db(id)
mbligh36768f02008-02-22 18:28:33 +00001825
showarda3c58572009-03-12 20:36:59 +00001826 if self._initialized:
1827 differences = self._compare_fields_in_row(row)
1828 if differences:
1829 print ('initialized %s %s instance requery is updating: %s' %
1830 (type(self), self.id, differences))
showard2bab8f42008-11-12 18:15:22 +00001831 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001832 self._initialized = True
1833
1834
1835 @classmethod
1836 def _clear_instance_cache(cls):
1837 """Used for testing, clear the internal instance cache."""
1838 cls._instances_by_type_and_id.clear()
1839
1840
showardccbd6c52009-03-21 00:10:21 +00001841 def _fetch_row_from_db(self, row_id):
1842 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1843 rows = _db.execute(sql, (row_id,))
1844 if not rows:
1845 raise DBError("row not found (table=%s, id=%s)"
1846 % (self.__table, id))
1847 return rows[0]
1848
1849
showarda3c58572009-03-12 20:36:59 +00001850 def _assert_row_length(self, row):
1851 assert len(row) == len(self._fields), (
1852 "table = %s, row = %s/%d, fields = %s/%d" % (
1853 self.__table, row, len(row), self._fields, len(self._fields)))
1854
1855
1856 def _compare_fields_in_row(self, row):
1857 """
1858 Given a row as returned by a SELECT query, compare it to our existing
1859 in memory fields.
1860
1861 @param row - A sequence of values corresponding to fields named in
1862 The class attribute _fields.
1863
1864 @returns A dictionary listing the differences keyed by field name
1865 containing tuples of (current_value, row_value).
1866 """
1867 self._assert_row_length(row)
1868 differences = {}
1869 for field, row_value in itertools.izip(self._fields, row):
1870 current_value = getattr(self, field)
1871 if current_value != row_value:
1872 differences[field] = (current_value, row_value)
1873 return differences
showard2bab8f42008-11-12 18:15:22 +00001874
1875
1876 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001877 """
1878 Update our field attributes using a single row returned by SELECT.
1879
1880 @param row - A sequence of values corresponding to fields named in
1881 the class fields list.
1882 """
1883 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001884
showard2bab8f42008-11-12 18:15:22 +00001885 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001886 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001887 setattr(self, field, value)
1888 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001889
showard2bab8f42008-11-12 18:15:22 +00001890 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001891
mblighe2586682008-02-29 22:45:46 +00001892
showardccbd6c52009-03-21 00:10:21 +00001893 def update_from_database(self):
1894 assert self.id is not None
1895 row = self._fetch_row_from_db(self.id)
1896 self._update_fields_from_row(row)
1897
1898
jadmanski0afbb632008-06-06 21:10:57 +00001899 def count(self, where, table = None):
1900 if not table:
1901 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001902
jadmanski0afbb632008-06-06 21:10:57 +00001903 rows = _db.execute("""
1904 SELECT count(*) FROM %s
1905 WHERE %s
1906 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001907
jadmanski0afbb632008-06-06 21:10:57 +00001908 assert len(rows) == 1
1909
1910 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001911
1912
mblighf8c624d2008-07-03 16:58:45 +00001913 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001914 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001915
showard2bab8f42008-11-12 18:15:22 +00001916 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001917 return
mbligh36768f02008-02-22 18:28:33 +00001918
mblighf8c624d2008-07-03 16:58:45 +00001919 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1920 if condition:
1921 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001922 _db.execute(query, (value, self.id))
1923
showard2bab8f42008-11-12 18:15:22 +00001924 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001925
1926
jadmanski0afbb632008-06-06 21:10:57 +00001927 def save(self):
1928 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001929 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001930 columns = ','.join([str(key) for key in keys])
1931 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001932 values_str = ','.join(values)
1933 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1934 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001935 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001936 # Update our id to the one the database just assigned to us.
1937 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001938
1939
jadmanski0afbb632008-06-06 21:10:57 +00001940 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001941 self._instances_by_type_and_id.pop((type(self), id), None)
1942 self._initialized = False
1943 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001944 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1945 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001946
1947
showard63a34772008-08-18 19:32:50 +00001948 @staticmethod
1949 def _prefix_with(string, prefix):
1950 if string:
1951 string = prefix + string
1952 return string
1953
1954
jadmanski0afbb632008-06-06 21:10:57 +00001955 @classmethod
showard989f25d2008-10-01 11:38:11 +00001956 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001957 """
1958 Construct instances of our class based on the given database query.
1959
1960 @yields One class instance for each row fetched.
1961 """
showard63a34772008-08-18 19:32:50 +00001962 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1963 where = cls._prefix_with(where, 'WHERE ')
1964 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001965 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001966 'joins' : joins,
1967 'where' : where,
1968 'order_by' : order_by})
1969 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001970 for row in rows:
1971 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001972
mbligh36768f02008-02-22 18:28:33 +00001973
1974class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001975 _table_name = 'ineligible_host_queues'
1976 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001977
1978
showard89f84db2009-03-12 20:39:13 +00001979class AtomicGroup(DBObject):
1980 _table_name = 'atomic_groups'
1981 _fields = ('id', 'name', 'description', 'max_number_of_machines')
1982
1983
showard989f25d2008-10-01 11:38:11 +00001984class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001985 _table_name = 'labels'
1986 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001987 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001988
1989
mbligh36768f02008-02-22 18:28:33 +00001990class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001991 _table_name = 'hosts'
1992 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1993 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1994
1995
jadmanski0afbb632008-06-06 21:10:57 +00001996 def current_task(self):
1997 rows = _db.execute("""
1998 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1999 """, (self.id,))
2000
2001 if len(rows) == 0:
2002 return None
2003 else:
2004 assert len(rows) == 1
2005 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00002006 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00002007
2008
jadmanski0afbb632008-06-06 21:10:57 +00002009 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00002010 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00002011 if self.current_task():
2012 self.current_task().requeue()
2013
showard6ae5ea92009-02-25 00:11:51 +00002014
jadmanski0afbb632008-06-06 21:10:57 +00002015 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00002016 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00002017 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00002018
2019
showard170873e2009-01-07 00:22:26 +00002020 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002021 """
showard170873e2009-01-07 00:22:26 +00002022 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002023 """
2024 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002025 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002026 FROM labels
2027 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002028 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002029 ORDER BY labels.name
2030 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002031 platform = None
2032 all_labels = []
2033 for label_name, is_platform in rows:
2034 if is_platform:
2035 platform = label_name
2036 all_labels.append(label_name)
2037 return platform, all_labels
2038
2039
2040 def reverify_tasks(self):
2041 cleanup_task = CleanupTask(host=self)
2042 verify_task = VerifyTask(host=self)
2043 # just to make sure this host does not get taken away
2044 self.set_status('Cleaning')
2045 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002046
2047
mbligh36768f02008-02-22 18:28:33 +00002048class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002049 _table_name = 'host_queue_entries'
2050 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002051 'active', 'complete', 'deleted', 'execution_subdir',
2052 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002053
2054
showarda3c58572009-03-12 20:36:59 +00002055 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002056 assert id or row
showarda3c58572009-03-12 20:36:59 +00002057 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002058 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002059
jadmanski0afbb632008-06-06 21:10:57 +00002060 if self.host_id:
2061 self.host = Host(self.host_id)
2062 else:
2063 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002064
showard170873e2009-01-07 00:22:26 +00002065 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002066 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002067
2068
showard89f84db2009-03-12 20:39:13 +00002069 @classmethod
2070 def clone(cls, template):
2071 """
2072 Creates a new row using the values from a template instance.
2073
2074 The new instance will not exist in the database or have a valid
2075 id attribute until its save() method is called.
2076 """
2077 assert isinstance(template, cls)
2078 new_row = [getattr(template, field) for field in cls._fields]
2079 clone = cls(row=new_row, new_record=True)
2080 clone.id = None
2081 return clone
2082
2083
showardc85c21b2008-11-24 22:17:37 +00002084 def _view_job_url(self):
2085 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2086
2087
jadmanski0afbb632008-06-06 21:10:57 +00002088 def set_host(self, host):
2089 if host:
2090 self.queue_log_record('Assigning host ' + host.hostname)
2091 self.update_field('host_id', host.id)
2092 self.update_field('active', True)
2093 self.block_host(host.id)
2094 else:
2095 self.queue_log_record('Releasing host')
2096 self.unblock_host(self.host.id)
2097 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002098
jadmanski0afbb632008-06-06 21:10:57 +00002099 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002100
2101
jadmanski0afbb632008-06-06 21:10:57 +00002102 def get_host(self):
2103 return self.host
mbligh36768f02008-02-22 18:28:33 +00002104
2105
jadmanski0afbb632008-06-06 21:10:57 +00002106 def queue_log_record(self, log_line):
2107 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002108 _drone_manager.write_lines_to_file(self.queue_log_path,
2109 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002110
2111
jadmanski0afbb632008-06-06 21:10:57 +00002112 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002113 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002114 row = [0, self.job.id, host_id]
2115 block = IneligibleHostQueue(row=row, new_record=True)
2116 block.save()
mblighe2586682008-02-29 22:45:46 +00002117
2118
jadmanski0afbb632008-06-06 21:10:57 +00002119 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002120 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002121 blocks = IneligibleHostQueue.fetch(
2122 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2123 for block in blocks:
2124 block.delete()
mblighe2586682008-02-29 22:45:46 +00002125
2126
showard2bab8f42008-11-12 18:15:22 +00002127 def set_execution_subdir(self, subdir=None):
2128 if subdir is None:
2129 assert self.get_host()
2130 subdir = self.get_host().hostname
2131 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002132
2133
showard6355f6b2008-12-05 18:52:13 +00002134 def _get_hostname(self):
2135 if self.host:
2136 return self.host.hostname
2137 return 'no host'
2138
2139
showard170873e2009-01-07 00:22:26 +00002140 def __str__(self):
2141 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2142
2143
jadmanski0afbb632008-06-06 21:10:57 +00002144 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002145 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2146 if status not in abort_statuses:
2147 condition = ' AND '.join(['status <> "%s"' % x
2148 for x in abort_statuses])
2149 else:
2150 condition = ''
2151 self.update_field('status', status, condition=condition)
2152
showardb18134f2009-03-20 20:52:18 +00002153 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002154
showardc85c21b2008-11-24 22:17:37 +00002155 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002156 self.update_field('complete', False)
2157 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002158
jadmanski0afbb632008-06-06 21:10:57 +00002159 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002160 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002161 self.update_field('complete', False)
2162 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002163
showardc85c21b2008-11-24 22:17:37 +00002164 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002165 self.update_field('complete', True)
2166 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002167
2168 should_email_status = (status.lower() in _notify_email_statuses or
2169 'all' in _notify_email_statuses)
2170 if should_email_status:
2171 self._email_on_status(status)
2172
2173 self._email_on_job_complete()
2174
2175
2176 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002177 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002178
2179 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2180 self.job.id, self.job.name, hostname, status)
2181 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2182 self.job.id, self.job.name, hostname, status,
2183 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002184 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002185
2186
2187 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002188 if not self.job.is_finished():
2189 return
showard542e8402008-09-19 20:16:18 +00002190
showardc85c21b2008-11-24 22:17:37 +00002191 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002192 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002193 for queue_entry in hosts_queue:
2194 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002195 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002196 queue_entry.status))
2197
2198 summary_text = "\n".join(summary_text)
2199 status_counts = models.Job.objects.get_status_counts(
2200 [self.job.id])[self.job.id]
2201 status = ', '.join('%d %s' % (count, status) for status, count
2202 in status_counts.iteritems())
2203
2204 subject = 'Autotest: Job ID: %s "%s" %s' % (
2205 self.job.id, self.job.name, status)
2206 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2207 self.job.id, self.job.name, status, self._view_job_url(),
2208 summary_text)
showard170873e2009-01-07 00:22:26 +00002209 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002210
2211
showard89f84db2009-03-12 20:39:13 +00002212 def run(self, assigned_host=None):
2213 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002214 assert assigned_host
2215 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002216 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002217
showardb18134f2009-03-20 20:52:18 +00002218 logging.info("%s/%s/%s scheduled on %s, status=%s",
2219 self.job.name, self.meta_host, self.atomic_group_id,
2220 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002221
jadmanski0afbb632008-06-06 21:10:57 +00002222 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002223
showard6ae5ea92009-02-25 00:11:51 +00002224
jadmanski0afbb632008-06-06 21:10:57 +00002225 def requeue(self):
2226 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002227 # verify/cleanup failure sets the execution subdir, so reset it here
2228 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002229 if self.meta_host:
2230 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002231
2232
jadmanski0afbb632008-06-06 21:10:57 +00002233 def handle_host_failure(self):
2234 """\
2235 Called when this queue entry's host has failed verification and
2236 repair.
2237 """
2238 assert not self.meta_host
2239 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002240 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002241
2242
jadmanskif7fa2cc2008-10-01 14:13:23 +00002243 @property
2244 def aborted_by(self):
2245 self._load_abort_info()
2246 return self._aborted_by
2247
2248
2249 @property
2250 def aborted_on(self):
2251 self._load_abort_info()
2252 return self._aborted_on
2253
2254
2255 def _load_abort_info(self):
2256 """ Fetch info about who aborted the job. """
2257 if hasattr(self, "_aborted_by"):
2258 return
2259 rows = _db.execute("""
2260 SELECT users.login, aborted_host_queue_entries.aborted_on
2261 FROM aborted_host_queue_entries
2262 INNER JOIN users
2263 ON users.id = aborted_host_queue_entries.aborted_by_id
2264 WHERE aborted_host_queue_entries.queue_entry_id = %s
2265 """, (self.id,))
2266 if rows:
2267 self._aborted_by, self._aborted_on = rows[0]
2268 else:
2269 self._aborted_by = self._aborted_on = None
2270
2271
showardb2e2c322008-10-14 17:33:55 +00002272 def on_pending(self):
2273 """
2274 Called when an entry in a synchronous job has passed verify. If the
2275 job is ready to run, returns an agent to run the job. Returns None
2276 otherwise.
2277 """
2278 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002279 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002280 if self.job.is_ready():
2281 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002282 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002283 return None
2284
2285
showard170873e2009-01-07 00:22:26 +00002286 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002287 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002288 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002289 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002290
showard170873e2009-01-07 00:22:26 +00002291 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002292 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002293 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2294
2295 def execution_tag(self):
2296 assert self.execution_subdir
2297 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002298
2299
mbligh36768f02008-02-22 18:28:33 +00002300class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002301 _table_name = 'jobs'
2302 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2303 'control_type', 'created_on', 'synch_count', 'timeout',
2304 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2305
2306
showarda3c58572009-03-12 20:36:59 +00002307 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002308 assert id or row
showarda3c58572009-03-12 20:36:59 +00002309 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002310
mblighe2586682008-02-29 22:45:46 +00002311
jadmanski0afbb632008-06-06 21:10:57 +00002312 def is_server_job(self):
2313 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002314
2315
showard170873e2009-01-07 00:22:26 +00002316 def tag(self):
2317 return "%s-%s" % (self.id, self.owner)
2318
2319
jadmanski0afbb632008-06-06 21:10:57 +00002320 def get_host_queue_entries(self):
2321 rows = _db.execute("""
2322 SELECT * FROM host_queue_entries
2323 WHERE job_id= %s
2324 """, (self.id,))
2325 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002326
jadmanski0afbb632008-06-06 21:10:57 +00002327 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002328
jadmanski0afbb632008-06-06 21:10:57 +00002329 return entries
mbligh36768f02008-02-22 18:28:33 +00002330
2331
jadmanski0afbb632008-06-06 21:10:57 +00002332 def set_status(self, status, update_queues=False):
2333 self.update_field('status',status)
2334
2335 if update_queues:
2336 for queue_entry in self.get_host_queue_entries():
2337 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002338
2339
jadmanski0afbb632008-06-06 21:10:57 +00002340 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002341 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2342 status='Pending')
2343 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002344
2345
jadmanski0afbb632008-06-06 21:10:57 +00002346 def num_machines(self, clause = None):
2347 sql = "job_id=%s" % self.id
2348 if clause:
2349 sql += " AND (%s)" % clause
2350 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002351
2352
jadmanski0afbb632008-06-06 21:10:57 +00002353 def num_queued(self):
2354 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002355
2356
jadmanski0afbb632008-06-06 21:10:57 +00002357 def num_active(self):
2358 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002359
2360
jadmanski0afbb632008-06-06 21:10:57 +00002361 def num_complete(self):
2362 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002363
2364
jadmanski0afbb632008-06-06 21:10:57 +00002365 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002366 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002367
mbligh36768f02008-02-22 18:28:33 +00002368
showard6bb7c292009-01-30 01:44:51 +00002369 def _not_yet_run_entries(self, include_verifying=True):
2370 statuses = [models.HostQueueEntry.Status.QUEUED,
2371 models.HostQueueEntry.Status.PENDING]
2372 if include_verifying:
2373 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2374 return models.HostQueueEntry.objects.filter(job=self.id,
2375 status__in=statuses)
2376
2377
2378 def _stop_all_entries(self):
2379 entries_to_stop = self._not_yet_run_entries(
2380 include_verifying=False)
2381 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002382 assert not child_entry.complete, (
2383 '%s status=%s, active=%s, complete=%s' %
2384 (child_entry.id, child_entry.status, child_entry.active,
2385 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002386 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2387 child_entry.host.status = models.Host.Status.READY
2388 child_entry.host.save()
2389 child_entry.status = models.HostQueueEntry.Status.STOPPED
2390 child_entry.save()
2391
showard2bab8f42008-11-12 18:15:22 +00002392 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002393 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002394 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002395 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002396
2397
jadmanski0afbb632008-06-06 21:10:57 +00002398 def write_to_machines_file(self, queue_entry):
2399 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002400 file_path = os.path.join(self.tag(), '.machines')
2401 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002402
2403
showard2bab8f42008-11-12 18:15:22 +00002404 def _next_group_name(self):
2405 query = models.HostQueueEntry.objects.filter(
2406 job=self.id).values('execution_subdir').distinct()
2407 subdirs = (entry['execution_subdir'] for entry in query)
2408 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2409 ids = [int(match.group(1)) for match in groups if match]
2410 if ids:
2411 next_id = max(ids) + 1
2412 else:
2413 next_id = 0
2414 return "group%d" % next_id
2415
2416
showard170873e2009-01-07 00:22:26 +00002417 def _write_control_file(self, execution_tag):
2418 control_path = _drone_manager.attach_file_to_execution(
2419 execution_tag, self.control_file)
2420 return control_path
mbligh36768f02008-02-22 18:28:33 +00002421
showardb2e2c322008-10-14 17:33:55 +00002422
showard2bab8f42008-11-12 18:15:22 +00002423 def get_group_entries(self, queue_entry_from_group):
2424 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002425 return list(HostQueueEntry.fetch(
2426 where='job_id=%s AND execution_subdir=%s',
2427 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002428
2429
showardb2e2c322008-10-14 17:33:55 +00002430 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002431 assert queue_entries
2432 execution_tag = queue_entries[0].execution_tag()
2433 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002434 hostnames = ','.join([entry.get_host().hostname
2435 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002436
showard170873e2009-01-07 00:22:26 +00002437 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2438 '-r', _drone_manager.absolute_path(execution_tag),
2439 '-u', self.owner, '-l', self.name, '-m', hostnames,
2440 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002441
jadmanski0afbb632008-06-06 21:10:57 +00002442 if not self.is_server_job():
2443 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002444
showardb2e2c322008-10-14 17:33:55 +00002445 return params
mblighe2586682008-02-29 22:45:46 +00002446
mbligh36768f02008-02-22 18:28:33 +00002447
showardc9ae1782009-01-30 01:42:37 +00002448 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002449 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002450 return True
showard0fc38302008-10-23 00:44:07 +00002451 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002452 return queue_entry.get_host().dirty
2453 return False
showard21baa452008-10-21 00:08:39 +00002454
showardc9ae1782009-01-30 01:42:37 +00002455
2456 def _should_run_verify(self, queue_entry):
2457 do_not_verify = (queue_entry.host.protection ==
2458 host_protections.Protection.DO_NOT_VERIFY)
2459 if do_not_verify:
2460 return False
2461 return self.run_verify
2462
2463
2464 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002465 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002466 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002467 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002468 if self._should_run_verify(queue_entry):
2469 tasks.append(VerifyTask(queue_entry=queue_entry))
2470 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002471 return tasks
2472
2473
showard2bab8f42008-11-12 18:15:22 +00002474 def _assign_new_group(self, queue_entries):
2475 if len(queue_entries) == 1:
2476 group_name = queue_entries[0].get_host().hostname
2477 else:
2478 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002479 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002480 self.id, [entry.host.hostname for entry in queue_entries],
2481 group_name)
2482
2483 for queue_entry in queue_entries:
2484 queue_entry.set_execution_subdir(group_name)
2485
2486
2487 def _choose_group_to_run(self, include_queue_entry):
2488 chosen_entries = [include_queue_entry]
2489
2490 num_entries_needed = self.synch_count - 1
2491 if num_entries_needed > 0:
2492 pending_entries = HostQueueEntry.fetch(
2493 where='job_id = %s AND status = "Pending" AND id != %s',
2494 params=(self.id, include_queue_entry.id))
2495 chosen_entries += list(pending_entries)[:num_entries_needed]
2496
2497 self._assign_new_group(chosen_entries)
2498 return chosen_entries
2499
2500
2501 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002502 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002503 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2504 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002505
showard2bab8f42008-11-12 18:15:22 +00002506 queue_entries = self._choose_group_to_run(queue_entry)
2507 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002508
2509
2510 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002511 for queue_entry in queue_entries:
2512 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002513 params = self._get_autoserv_params(queue_entries)
2514 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2515 cmd=params)
2516 tasks = initial_tasks + [queue_task]
2517 entry_ids = [entry.id for entry in queue_entries]
2518
showard170873e2009-01-07 00:22:26 +00002519 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002520
2521
mbligh36768f02008-02-22 18:28:33 +00002522if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002523 main()