blob: 11b7f2d30bc176cba27a862242e857f5a6219e13 [file] [log] [blame]
mbligh36768f02008-02-22 18:28:33 +00001#!/usr/bin/python -u
2
3"""
4Autotest scheduler
5"""
showard909c7a62008-07-15 21:52:38 +00006
mbligh36768f02008-02-22 18:28:33 +00007
showard89f84db2009-03-12 20:39:13 +00008import datetime, errno, optparse, os, pwd, Queue, random, re, shutil, signal
showard542e8402008-09-19 20:16:18 +00009import smtplib, socket, stat, subprocess, sys, tempfile, time, traceback
showardb18134f2009-03-20 20:52:18 +000010import itertools, logging, logging.config, weakref
mbligh70feeee2008-06-11 16:20:49 +000011import common
mbligh8bcd23a2009-02-03 19:14:06 +000012import MySQLdb
showard21baa452008-10-21 00:08:39 +000013from autotest_lib.frontend import setup_django_environment
showard542e8402008-09-19 20:16:18 +000014from autotest_lib.client.common_lib import global_config
showardb18134f2009-03-20 20:52:18 +000015from autotest_lib.client.common_lib import host_protections, utils
showardb1e51872008-10-07 11:08:18 +000016from autotest_lib.database import database_connection
showard21baa452008-10-21 00:08:39 +000017from autotest_lib.frontend.afe import models
showard170873e2009-01-07 00:22:26 +000018from autotest_lib.scheduler import drone_manager, drones, email_manager
showardd1ee1dd2009-01-07 21:33:08 +000019from autotest_lib.scheduler import status_server, scheduler_config
mbligh70feeee2008-06-11 16:20:49 +000020
mblighb090f142008-02-27 21:33:46 +000021
mbligh36768f02008-02-22 18:28:33 +000022RESULTS_DIR = '.'
23AUTOSERV_NICE_LEVEL = 10
showard170873e2009-01-07 00:22:26 +000024DB_CONFIG_SECTION = 'AUTOTEST_WEB'
mbligh36768f02008-02-22 18:28:33 +000025
26AUTOTEST_PATH = os.path.join(os.path.dirname(__file__), '..')
27
28if os.environ.has_key('AUTOTEST_DIR'):
jadmanski0afbb632008-06-06 21:10:57 +000029 AUTOTEST_PATH = os.environ['AUTOTEST_DIR']
mbligh36768f02008-02-22 18:28:33 +000030AUTOTEST_SERVER_DIR = os.path.join(AUTOTEST_PATH, 'server')
31AUTOTEST_TKO_DIR = os.path.join(AUTOTEST_PATH, 'tko')
32
33if AUTOTEST_SERVER_DIR not in sys.path:
jadmanski0afbb632008-06-06 21:10:57 +000034 sys.path.insert(0, AUTOTEST_SERVER_DIR)
mbligh36768f02008-02-22 18:28:33 +000035
mbligh90a549d2008-03-25 23:52:34 +000036# how long to wait for autoserv to write a pidfile
37PIDFILE_TIMEOUT = 5 * 60 # 5 min
mblighbb421852008-03-11 22:36:16 +000038
showard35162b02009-03-03 02:17:30 +000039# error message to leave in results dir when an autoserv process disappears
40# mysteriously
41_LOST_PROCESS_ERROR = """\
42Autoserv failed abnormally during execution for this job, probably due to a
43system error on the Autotest server. Full results may not be available. Sorry.
44"""
45
mbligh6f8bab42008-02-29 22:45:14 +000046_db = None
mbligh36768f02008-02-22 18:28:33 +000047_shutdown = False
showard170873e2009-01-07 00:22:26 +000048_autoserv_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'server', 'autoserv')
49_parser_path = os.path.join(drones.AUTOTEST_INSTALL_DIR, 'tko', 'parse')
mbligh4314a712008-02-29 22:44:30 +000050_testing_mode = False
showard542e8402008-09-19 20:16:18 +000051_base_url = None
showardc85c21b2008-11-24 22:17:37 +000052_notify_email_statuses = []
showard170873e2009-01-07 00:22:26 +000053_drone_manager = drone_manager.DroneManager()
mbligh36768f02008-02-22 18:28:33 +000054
showardb18134f2009-03-20 20:52:18 +000055# load the logging settings
56scheduler_dir = os.path.join(AUTOTEST_PATH, 'scheduler')
57os.environ['AUTOTEST_SCHEDULER_LOG_DIR'] = os.path.join(AUTOTEST_PATH, 'logs')
58# Here we export the log name, using the same convention as autoserv's results
59# directory.
60scheduler_log_name = 'scheduler.log.%s' % time.strftime('%Y-%m-%d-%H.%M.%S')
61os.environ['AUTOTEST_SCHEDULER_LOG_NAME'] = scheduler_log_name
62logging.config.fileConfig(os.path.join(scheduler_dir, 'debug_scheduler.ini'))
63
mbligh36768f02008-02-22 18:28:33 +000064
65def main():
jadmanski0afbb632008-06-06 21:10:57 +000066 usage = 'usage: %prog [options] results_dir'
mbligh36768f02008-02-22 18:28:33 +000067
jadmanski0afbb632008-06-06 21:10:57 +000068 parser = optparse.OptionParser(usage)
69 parser.add_option('--recover-hosts', help='Try to recover dead hosts',
70 action='store_true')
71 parser.add_option('--logfile', help='Set a log file that all stdout ' +
72 'should be redirected to. Stderr will go to this ' +
73 'file + ".err"')
74 parser.add_option('--test', help='Indicate that scheduler is under ' +
75 'test and should use dummy autoserv and no parsing',
76 action='store_true')
77 (options, args) = parser.parse_args()
78 if len(args) != 1:
79 parser.print_usage()
80 return
mbligh36768f02008-02-22 18:28:33 +000081
jadmanski0afbb632008-06-06 21:10:57 +000082 global RESULTS_DIR
83 RESULTS_DIR = args[0]
mbligh36768f02008-02-22 18:28:33 +000084
showardcca334f2009-03-12 20:38:34 +000085 # Change the cwd while running to avoid issues incase we were launched from
86 # somewhere odd (such as a random NFS home directory of the person running
87 # sudo to launch us as the appropriate user).
88 os.chdir(RESULTS_DIR)
89
jadmanski0afbb632008-06-06 21:10:57 +000090 c = global_config.global_config
showardd1ee1dd2009-01-07 21:33:08 +000091 notify_statuses_list = c.get_config_value(scheduler_config.CONFIG_SECTION,
92 "notify_email_statuses",
93 default='')
showardc85c21b2008-11-24 22:17:37 +000094 global _notify_email_statuses
showard170873e2009-01-07 00:22:26 +000095 _notify_email_statuses = [status for status in
96 re.split(r'[\s,;:]', notify_statuses_list.lower())
97 if status]
showardc85c21b2008-11-24 22:17:37 +000098
jadmanski0afbb632008-06-06 21:10:57 +000099 if options.test:
100 global _autoserv_path
101 _autoserv_path = 'autoserv_dummy'
102 global _testing_mode
103 _testing_mode = True
mbligh36768f02008-02-22 18:28:33 +0000104
mbligh37eceaa2008-12-15 22:56:37 +0000105 # AUTOTEST_WEB.base_url is still a supported config option as some people
106 # may wish to override the entire url.
showard542e8402008-09-19 20:16:18 +0000107 global _base_url
showard170873e2009-01-07 00:22:26 +0000108 config_base_url = c.get_config_value(DB_CONFIG_SECTION, 'base_url',
109 default='')
mbligh37eceaa2008-12-15 22:56:37 +0000110 if config_base_url:
111 _base_url = config_base_url
showard542e8402008-09-19 20:16:18 +0000112 else:
mbligh37eceaa2008-12-15 22:56:37 +0000113 # For the common case of everything running on a single server you
114 # can just set the hostname in a single place in the config file.
115 server_name = c.get_config_value('SERVER', 'hostname')
116 if not server_name:
showardb18134f2009-03-20 20:52:18 +0000117 logging.critical('[SERVER] hostname missing from the config file.')
mbligh37eceaa2008-12-15 22:56:37 +0000118 sys.exit(1)
119 _base_url = 'http://%s/afe/' % server_name
showard542e8402008-09-19 20:16:18 +0000120
showardc5afc462009-01-13 00:09:39 +0000121 server = status_server.StatusServer(_drone_manager)
showardd1ee1dd2009-01-07 21:33:08 +0000122 server.start()
123
jadmanski0afbb632008-06-06 21:10:57 +0000124 try:
showardc5afc462009-01-13 00:09:39 +0000125 init(options.logfile)
126 dispatcher = Dispatcher()
127 dispatcher.do_initial_recovery(recover_hosts=options.recover_hosts)
128
jadmanski0afbb632008-06-06 21:10:57 +0000129 while not _shutdown:
130 dispatcher.tick()
showardd1ee1dd2009-01-07 21:33:08 +0000131 time.sleep(scheduler_config.config.tick_pause_sec)
jadmanski0afbb632008-06-06 21:10:57 +0000132 except:
showard170873e2009-01-07 00:22:26 +0000133 email_manager.manager.log_stacktrace(
134 "Uncaught exception; terminating monitor_db")
jadmanski0afbb632008-06-06 21:10:57 +0000135
showard170873e2009-01-07 00:22:26 +0000136 email_manager.manager.send_queued_emails()
showard55b4b542009-01-08 23:30:30 +0000137 server.shutdown()
showard170873e2009-01-07 00:22:26 +0000138 _drone_manager.shutdown()
jadmanski0afbb632008-06-06 21:10:57 +0000139 _db.disconnect()
mbligh36768f02008-02-22 18:28:33 +0000140
141
142def handle_sigint(signum, frame):
jadmanski0afbb632008-06-06 21:10:57 +0000143 global _shutdown
144 _shutdown = True
showardb18134f2009-03-20 20:52:18 +0000145 logging.info("Shutdown request received.")
mbligh36768f02008-02-22 18:28:33 +0000146
147
148def init(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000149 if logfile:
150 enable_logging(logfile)
showardb18134f2009-03-20 20:52:18 +0000151 logging.info("%s> dispatcher starting", time.strftime("%X %x"))
152 logging.info("My PID is %d", os.getpid())
mbligh36768f02008-02-22 18:28:33 +0000153
showardb1e51872008-10-07 11:08:18 +0000154 if _testing_mode:
155 global_config.global_config.override_config_value(
showard170873e2009-01-07 00:22:26 +0000156 DB_CONFIG_SECTION, 'database', 'stresstest_autotest_web')
showardb1e51872008-10-07 11:08:18 +0000157
jadmanski0afbb632008-06-06 21:10:57 +0000158 os.environ['PATH'] = AUTOTEST_SERVER_DIR + ':' + os.environ['PATH']
159 global _db
showard170873e2009-01-07 00:22:26 +0000160 _db = database_connection.DatabaseConnection(DB_CONFIG_SECTION)
jadmanski0afbb632008-06-06 21:10:57 +0000161 _db.connect()
mbligh36768f02008-02-22 18:28:33 +0000162
showardfa8629c2008-11-04 16:51:23 +0000163 # ensure Django connection is in autocommit
164 setup_django_environment.enable_autocommit()
165
showardb18134f2009-03-20 20:52:18 +0000166 logging.info("Setting signal handler")
jadmanski0afbb632008-06-06 21:10:57 +0000167 signal.signal(signal.SIGINT, handle_sigint)
168
showardd1ee1dd2009-01-07 21:33:08 +0000169 drones = global_config.global_config.get_config_value(
170 scheduler_config.CONFIG_SECTION, 'drones', default='localhost')
171 drone_list = [hostname.strip() for hostname in drones.split(',')]
showard170873e2009-01-07 00:22:26 +0000172 results_host = global_config.global_config.get_config_value(
showardd1ee1dd2009-01-07 21:33:08 +0000173 scheduler_config.CONFIG_SECTION, 'results_host', default='localhost')
showard170873e2009-01-07 00:22:26 +0000174 _drone_manager.initialize(RESULTS_DIR, drone_list, results_host)
175
showardb18134f2009-03-20 20:52:18 +0000176 logging.info("Connected! Running...")
mbligh36768f02008-02-22 18:28:33 +0000177
178
179def enable_logging(logfile):
jadmanski0afbb632008-06-06 21:10:57 +0000180 out_file = logfile
181 err_file = "%s.err" % logfile
showardb18134f2009-03-20 20:52:18 +0000182 logging.info("Enabling logging to %s (%s)", out_file, err_file)
jadmanski0afbb632008-06-06 21:10:57 +0000183 out_fd = open(out_file, "a", buffering=0)
184 err_fd = open(err_file, "a", buffering=0)
mbligh36768f02008-02-22 18:28:33 +0000185
jadmanski0afbb632008-06-06 21:10:57 +0000186 os.dup2(out_fd.fileno(), sys.stdout.fileno())
187 os.dup2(err_fd.fileno(), sys.stderr.fileno())
mbligh36768f02008-02-22 18:28:33 +0000188
jadmanski0afbb632008-06-06 21:10:57 +0000189 sys.stdout = out_fd
190 sys.stderr = err_fd
mbligh36768f02008-02-22 18:28:33 +0000191
192
mblighd5c95802008-03-05 00:33:46 +0000193def queue_entries_to_abort():
jadmanski0afbb632008-06-06 21:10:57 +0000194 rows = _db.execute("""
195 SELECT * FROM host_queue_entries WHERE status='Abort';
196 """)
showard2bab8f42008-11-12 18:15:22 +0000197
jadmanski0afbb632008-06-06 21:10:57 +0000198 qe = [HostQueueEntry(row=i) for i in rows]
199 return qe
mbligh36768f02008-02-22 18:28:33 +0000200
showard7cf9a9b2008-05-15 21:15:52 +0000201
showard89f84db2009-03-12 20:39:13 +0000202class SchedulerError(Exception):
203 """Raised by HostScheduler when an inconsistent state occurs."""
204
205
showard63a34772008-08-18 19:32:50 +0000206class HostScheduler(object):
207 def _get_ready_hosts(self):
208 # avoid any host with a currently active queue entry against it
209 hosts = Host.fetch(
210 joins='LEFT JOIN host_queue_entries AS active_hqe '
211 'ON (hosts.id = active_hqe.host_id AND '
showardb1e51872008-10-07 11:08:18 +0000212 'active_hqe.active)',
showard63a34772008-08-18 19:32:50 +0000213 where="active_hqe.host_id IS NULL "
showardb1e51872008-10-07 11:08:18 +0000214 "AND NOT hosts.locked "
showard63a34772008-08-18 19:32:50 +0000215 "AND (hosts.status IS NULL OR hosts.status = 'Ready')")
216 return dict((host.id, host) for host in hosts)
217
218
219 @staticmethod
220 def _get_sql_id_list(id_list):
221 return ','.join(str(item_id) for item_id in id_list)
222
223
224 @classmethod
showard989f25d2008-10-01 11:38:11 +0000225 def _get_many2many_dict(cls, query, id_list, flip=False):
mbligh849a0f62008-08-28 20:12:19 +0000226 if not id_list:
227 return {}
showard63a34772008-08-18 19:32:50 +0000228 query %= cls._get_sql_id_list(id_list)
229 rows = _db.execute(query)
showard989f25d2008-10-01 11:38:11 +0000230 return cls._process_many2many_dict(rows, flip)
231
232
233 @staticmethod
234 def _process_many2many_dict(rows, flip=False):
showard63a34772008-08-18 19:32:50 +0000235 result = {}
236 for row in rows:
showard89f84db2009-03-12 20:39:13 +0000237 left_id, right_id = int(row[0]), int(row[1])
showard989f25d2008-10-01 11:38:11 +0000238 if flip:
239 left_id, right_id = right_id, left_id
showard63a34772008-08-18 19:32:50 +0000240 result.setdefault(left_id, set()).add(right_id)
241 return result
242
243
244 @classmethod
245 def _get_job_acl_groups(cls, job_ids):
246 query = """
showardd9ac4452009-02-07 02:04:37 +0000247 SELECT jobs.id, acl_groups_users.aclgroup_id
showard63a34772008-08-18 19:32:50 +0000248 FROM jobs
249 INNER JOIN users ON users.login = jobs.owner
250 INNER JOIN acl_groups_users ON acl_groups_users.user_id = users.id
251 WHERE jobs.id IN (%s)
252 """
253 return cls._get_many2many_dict(query, job_ids)
254
255
256 @classmethod
257 def _get_job_ineligible_hosts(cls, job_ids):
258 query = """
259 SELECT job_id, host_id
260 FROM ineligible_host_queues
261 WHERE job_id IN (%s)
262 """
263 return cls._get_many2many_dict(query, job_ids)
264
265
266 @classmethod
showard989f25d2008-10-01 11:38:11 +0000267 def _get_job_dependencies(cls, job_ids):
268 query = """
269 SELECT job_id, label_id
270 FROM jobs_dependency_labels
271 WHERE job_id IN (%s)
272 """
273 return cls._get_many2many_dict(query, job_ids)
274
275
276 @classmethod
showard63a34772008-08-18 19:32:50 +0000277 def _get_host_acls(cls, host_ids):
278 query = """
showardd9ac4452009-02-07 02:04:37 +0000279 SELECT host_id, aclgroup_id
showard63a34772008-08-18 19:32:50 +0000280 FROM acl_groups_hosts
281 WHERE host_id IN (%s)
282 """
283 return cls._get_many2many_dict(query, host_ids)
284
285
286 @classmethod
287 def _get_label_hosts(cls, host_ids):
showardfa8629c2008-11-04 16:51:23 +0000288 if not host_ids:
289 return {}, {}
showard63a34772008-08-18 19:32:50 +0000290 query = """
291 SELECT label_id, host_id
292 FROM hosts_labels
293 WHERE host_id IN (%s)
showard989f25d2008-10-01 11:38:11 +0000294 """ % cls._get_sql_id_list(host_ids)
295 rows = _db.execute(query)
296 labels_to_hosts = cls._process_many2many_dict(rows)
297 hosts_to_labels = cls._process_many2many_dict(rows, flip=True)
298 return labels_to_hosts, hosts_to_labels
299
300
301 @classmethod
302 def _get_labels(cls):
303 return dict((label.id, label) for label in Label.fetch())
showard63a34772008-08-18 19:32:50 +0000304
305
306 def refresh(self, pending_queue_entries):
307 self._hosts_available = self._get_ready_hosts()
308
309 relevant_jobs = [queue_entry.job_id
310 for queue_entry in pending_queue_entries]
311 self._job_acls = self._get_job_acl_groups(relevant_jobs)
312 self._ineligible_hosts = self._get_job_ineligible_hosts(relevant_jobs)
showard989f25d2008-10-01 11:38:11 +0000313 self._job_dependencies = self._get_job_dependencies(relevant_jobs)
showard63a34772008-08-18 19:32:50 +0000314
315 host_ids = self._hosts_available.keys()
316 self._host_acls = self._get_host_acls(host_ids)
showard989f25d2008-10-01 11:38:11 +0000317 self._label_hosts, self._host_labels = self._get_label_hosts(host_ids)
318
319 self._labels = self._get_labels()
showard63a34772008-08-18 19:32:50 +0000320
321
322 def _is_acl_accessible(self, host_id, queue_entry):
323 job_acls = self._job_acls.get(queue_entry.job_id, set())
324 host_acls = self._host_acls.get(host_id, set())
325 return len(host_acls.intersection(job_acls)) > 0
326
327
showard989f25d2008-10-01 11:38:11 +0000328 def _check_job_dependencies(self, job_dependencies, host_labels):
329 missing = job_dependencies - host_labels
showard89f84db2009-03-12 20:39:13 +0000330 return len(missing) == 0
showard989f25d2008-10-01 11:38:11 +0000331
332
333 def _check_only_if_needed_labels(self, job_dependencies, host_labels,
334 queue_entry):
showardade14e22009-01-26 22:38:32 +0000335 if not queue_entry.meta_host:
336 # bypass only_if_needed labels when a specific host is selected
337 return True
338
showard989f25d2008-10-01 11:38:11 +0000339 for label_id in host_labels:
340 label = self._labels[label_id]
341 if not label.only_if_needed:
342 # we don't care about non-only_if_needed labels
343 continue
344 if queue_entry.meta_host == label_id:
345 # if the label was requested in a metahost it's OK
346 continue
347 if label_id not in job_dependencies:
348 return False
349 return True
350
351
showard89f84db2009-03-12 20:39:13 +0000352 def _check_atomic_group_labels(self, host_labels, queue_entry):
353 """
354 Determine if the given HostQueueEntry's atomic group settings are okay
355 to schedule on a host with the given labels.
356
357 @param host_labels - A list of label ids that the host has.
358 @param queue_entry - The HostQueueEntry being considered for the host.
359
360 @returns True if atomic group settings are okay, False otherwise.
361 """
362 return (self._get_host_atomic_group_id(host_labels) ==
363 queue_entry.atomic_group_id)
364
365
366 def _get_host_atomic_group_id(self, host_labels):
367 """
368 Return the atomic group label id for a host with the given set of
369 labels if any, or None otherwise. Raises an exception if more than
370 one atomic group are found in the set of labels.
371
372 @param host_labels - A list of label ids that the host has.
373
374 @returns The id of the atomic group found on a label in host_labels
375 or None if no atomic group label is found.
376 @raises SchedulerError - If more than one atomic group label is found.
377 """
378 atomic_ids = [self._labels[label_id].atomic_group_id
379 for label_id in host_labels
380 if self._labels[label_id].atomic_group_id is not None]
381 if not atomic_ids:
382 return None
383 if len(atomic_ids) > 1:
384 raise SchedulerError('More than one atomic label on host.')
385 return atomic_ids[0]
386
387
388 def _get_atomic_group_labels(self, atomic_group_id):
389 """
390 Lookup the label ids that an atomic_group is associated with.
391
392 @param atomic_group_id - The id of the AtomicGroup to look up.
393
394 @returns A generator yeilding Label ids for this atomic group.
395 """
396 return (id for id, label in self._labels.iteritems()
397 if label.atomic_group_id == atomic_group_id
398 and not label.invalid)
399
400
401 def _get_eligible_hosts_in_group(self, group_hosts, queue_entry):
402 """
403 @param group_hosts - A sequence of Host ids to test for usability
404 and eligibility against the Job associated with queue_entry.
405 @param queue_entry - The HostQueueEntry that these hosts are being
406 tested for eligibility against.
407
408 @returns A subset of group_hosts Host ids that are eligible for the
409 supplied queue_entry.
410 """
411 return set(host_id for host_id in group_hosts
412 if self._is_host_usable(host_id)
413 and self._is_host_eligible_for_job(host_id, queue_entry))
414
415
showard989f25d2008-10-01 11:38:11 +0000416 def _is_host_eligible_for_job(self, host_id, queue_entry):
417 job_dependencies = self._job_dependencies.get(queue_entry.job_id, set())
418 host_labels = self._host_labels.get(host_id, set())
mblighc993bee2008-10-03 03:42:34 +0000419
showard89f84db2009-03-12 20:39:13 +0000420 return (self._is_acl_accessible(host_id, queue_entry) and
421 self._check_job_dependencies(job_dependencies, host_labels) and
422 self._check_only_if_needed_labels(
423 job_dependencies, host_labels, queue_entry) and
424 self._check_atomic_group_labels(host_labels, queue_entry))
showard989f25d2008-10-01 11:38:11 +0000425
426
showard63a34772008-08-18 19:32:50 +0000427 def _schedule_non_metahost(self, queue_entry):
showard989f25d2008-10-01 11:38:11 +0000428 if not self._is_host_eligible_for_job(queue_entry.host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000429 return None
430 return self._hosts_available.pop(queue_entry.host_id, None)
431
432
433 def _is_host_usable(self, host_id):
434 if host_id not in self._hosts_available:
435 # host was already used during this scheduling cycle
436 return False
437 if self._hosts_available[host_id].invalid:
438 # Invalid hosts cannot be used for metahosts. They're included in
439 # the original query because they can be used by non-metahosts.
440 return False
441 return True
442
443
444 def _schedule_metahost(self, queue_entry):
445 label_id = queue_entry.meta_host
446 hosts_in_label = self._label_hosts.get(label_id, set())
447 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
448 set())
449
450 # must iterate over a copy so we can mutate the original while iterating
451 for host_id in list(hosts_in_label):
452 if not self._is_host_usable(host_id):
453 hosts_in_label.remove(host_id)
454 continue
455 if host_id in ineligible_host_ids:
456 continue
showard989f25d2008-10-01 11:38:11 +0000457 if not self._is_host_eligible_for_job(host_id, queue_entry):
showard63a34772008-08-18 19:32:50 +0000458 continue
459
showard89f84db2009-03-12 20:39:13 +0000460 # Remove the host from our cached internal state before returning
461 # the host object.
showard63a34772008-08-18 19:32:50 +0000462 hosts_in_label.remove(host_id)
463 return self._hosts_available.pop(host_id)
464 return None
465
466
467 def find_eligible_host(self, queue_entry):
468 if not queue_entry.meta_host:
showard89f84db2009-03-12 20:39:13 +0000469 assert queue_entry.host_id is not None
showard63a34772008-08-18 19:32:50 +0000470 return self._schedule_non_metahost(queue_entry)
showard89f84db2009-03-12 20:39:13 +0000471 assert queue_entry.atomic_group_id is None
showard63a34772008-08-18 19:32:50 +0000472 return self._schedule_metahost(queue_entry)
473
474
showard89f84db2009-03-12 20:39:13 +0000475 def find_eligible_atomic_group(self, queue_entry):
476 """
477 Given an atomic group host queue entry, locate an appropriate group
478 of hosts for the associated job to run on.
479
480 The caller is responsible for creating new HQEs for the additional
481 hosts returned in order to run the actual job on them.
482
483 @returns A list of Host instances in a ready state to satisfy this
484 atomic group scheduling. Hosts will all belong to the same
485 atomic group label as specified by the queue_entry.
486 An empty list will be returned if no suitable atomic
487 group could be found.
488
489 TODO(gps): what is responsible for kicking off any attempted repairs on
490 a group of hosts? not this function, but something needs to. We do
491 not communicate that reason for returning [] outside of here...
492 For now, we'll just be unschedulable if enough hosts within one group
493 enter Repair Failed state.
494 """
495 assert queue_entry.atomic_group_id is not None
496 job = queue_entry.job
497 assert job.synch_count and job.synch_count > 0
498 atomic_group = AtomicGroup(id=queue_entry.atomic_group_id)
499 if job.synch_count > atomic_group.max_number_of_machines:
500 # Such a Job and HostQueueEntry should never be possible to
501 # create using the frontend. Regardless, we can't process it.
502 # Abort it immediately and log an error on the scheduler.
503 queue_entry.set_status(models.HostQueueEntry.Status.ABORTED)
504 bprint('Error: job %d synch_count=%d > requested atomic_group %d '
505 'max_number_of_machines=%d. Aborted host_queue_entry %d.' %
506 (job.id, job.synch_count, atomic_group.id,
507 atomic_group.max_number_of_machines, queue_entry.id))
508 return []
509 hosts_in_label = self._label_hosts.get(queue_entry.meta_host, set())
510 ineligible_host_ids = self._ineligible_hosts.get(queue_entry.job_id,
511 set())
512
513 # Look in each label associated with atomic_group until we find one with
514 # enough hosts to satisfy the job.
515 for atomic_label_id in self._get_atomic_group_labels(atomic_group.id):
516 group_hosts = set(self._label_hosts.get(atomic_label_id, set()))
517 if queue_entry.meta_host is not None:
518 # If we have a metahost label, only allow its hosts.
519 group_hosts.intersection_update(hosts_in_label)
520 group_hosts -= ineligible_host_ids
521 eligible_hosts_in_group = self._get_eligible_hosts_in_group(
522 group_hosts, queue_entry)
523
524 # Job.synch_count is treated as "minimum synch count" when
525 # scheduling for an atomic group of hosts. The atomic group
526 # number of machines is the maximum to pick out of a single
527 # atomic group label for scheduling at one time.
528 min_hosts = job.synch_count
529 max_hosts = atomic_group.max_number_of_machines
530
531 if len(eligible_hosts_in_group) < min_hosts:
532 # Not enough eligible hosts in this atomic group label.
533 continue
534
535 # Limit ourselves to scheduling the atomic group size.
536 if len(eligible_hosts_in_group) > max_hosts:
537 eligible_hosts_in_group = random.sample(
538 eligible_hosts_in_group, max_hosts)
539
540 # Remove the selected hosts from our cached internal state
541 # of available hosts in order to return the Host objects.
542 host_list = []
543 for host_id in eligible_hosts_in_group:
544 hosts_in_label.discard(host_id)
545 host_list.append(self._hosts_available.pop(host_id))
546 return host_list
547
548 return []
549
550
showard170873e2009-01-07 00:22:26 +0000551class Dispatcher(object):
jadmanski0afbb632008-06-06 21:10:57 +0000552 def __init__(self):
553 self._agents = []
showard3bb499f2008-07-03 19:42:20 +0000554 self._last_clean_time = time.time()
showard63a34772008-08-18 19:32:50 +0000555 self._host_scheduler = HostScheduler()
showard170873e2009-01-07 00:22:26 +0000556 self._host_agents = {}
557 self._queue_entry_agents = {}
mbligh36768f02008-02-22 18:28:33 +0000558
mbligh36768f02008-02-22 18:28:33 +0000559
jadmanski0afbb632008-06-06 21:10:57 +0000560 def do_initial_recovery(self, recover_hosts=True):
561 # always recover processes
562 self._recover_processes()
mblighbb421852008-03-11 22:36:16 +0000563
jadmanski0afbb632008-06-06 21:10:57 +0000564 if recover_hosts:
565 self._recover_hosts()
mbligh36768f02008-02-22 18:28:33 +0000566
567
jadmanski0afbb632008-06-06 21:10:57 +0000568 def tick(self):
showard170873e2009-01-07 00:22:26 +0000569 _drone_manager.refresh()
showarda3ab0d52008-11-03 19:03:47 +0000570 self._run_cleanup_maybe()
jadmanski0afbb632008-06-06 21:10:57 +0000571 self._find_aborting()
572 self._schedule_new_jobs()
573 self._handle_agents()
showard170873e2009-01-07 00:22:26 +0000574 _drone_manager.execute_actions()
575 email_manager.manager.send_queued_emails()
mbligh36768f02008-02-22 18:28:33 +0000576
showard97aed502008-11-04 02:01:24 +0000577
showarda3ab0d52008-11-03 19:03:47 +0000578 def _run_cleanup_maybe(self):
showardd1ee1dd2009-01-07 21:33:08 +0000579 should_cleanup = (self._last_clean_time +
580 scheduler_config.config.clean_interval * 60 <
581 time.time())
582 if should_cleanup:
showardb18134f2009-03-20 20:52:18 +0000583 logging.info('Running cleanup')
showarda3ab0d52008-11-03 19:03:47 +0000584 self._abort_timed_out_jobs()
585 self._abort_jobs_past_synch_start_timeout()
586 self._clear_inactive_blocks()
showardfa8629c2008-11-04 16:51:23 +0000587 self._check_for_db_inconsistencies()
showarda3ab0d52008-11-03 19:03:47 +0000588 self._last_clean_time = time.time()
589
mbligh36768f02008-02-22 18:28:33 +0000590
showard170873e2009-01-07 00:22:26 +0000591 def _register_agent_for_ids(self, agent_dict, object_ids, agent):
592 for object_id in object_ids:
593 agent_dict.setdefault(object_id, set()).add(agent)
594
595
596 def _unregister_agent_for_ids(self, agent_dict, object_ids, agent):
597 for object_id in object_ids:
598 assert object_id in agent_dict
599 agent_dict[object_id].remove(agent)
600
601
jadmanski0afbb632008-06-06 21:10:57 +0000602 def add_agent(self, agent):
603 self._agents.append(agent)
604 agent.dispatcher = self
showard170873e2009-01-07 00:22:26 +0000605 self._register_agent_for_ids(self._host_agents, agent.host_ids, agent)
606 self._register_agent_for_ids(self._queue_entry_agents,
607 agent.queue_entry_ids, agent)
mblighd5c95802008-03-05 00:33:46 +0000608
showard170873e2009-01-07 00:22:26 +0000609
610 def get_agents_for_entry(self, queue_entry):
611 """
612 Find agents corresponding to the specified queue_entry.
613 """
614 return self._queue_entry_agents.get(queue_entry.id, set())
615
616
617 def host_has_agent(self, host):
618 """
619 Determine if there is currently an Agent present using this host.
620 """
621 return bool(self._host_agents.get(host.id, None))
mbligh36768f02008-02-22 18:28:33 +0000622
623
jadmanski0afbb632008-06-06 21:10:57 +0000624 def remove_agent(self, agent):
625 self._agents.remove(agent)
showard170873e2009-01-07 00:22:26 +0000626 self._unregister_agent_for_ids(self._host_agents, agent.host_ids,
627 agent)
628 self._unregister_agent_for_ids(self._queue_entry_agents,
629 agent.queue_entry_ids, agent)
showardec113162008-05-08 00:52:49 +0000630
631
showard4c5374f2008-09-04 17:02:56 +0000632 def num_running_processes(self):
633 return sum(agent.num_processes for agent in self._agents
634 if agent.is_running())
mblighbb421852008-03-11 22:36:16 +0000635
636
showard170873e2009-01-07 00:22:26 +0000637 def _extract_execution_tag(self, command_line):
638 match = re.match(r'.* -P (\S+) ', command_line)
639 if not match:
640 return None
641 return match.group(1)
mblighbb421852008-03-11 22:36:16 +0000642
643
showard2bab8f42008-11-12 18:15:22 +0000644 def _recover_queue_entries(self, queue_entries, run_monitor):
645 assert len(queue_entries) > 0
showard2bab8f42008-11-12 18:15:22 +0000646 queue_task = RecoveryQueueTask(job=queue_entries[0].job,
647 queue_entries=queue_entries,
648 run_monitor=run_monitor)
jadmanski0afbb632008-06-06 21:10:57 +0000649 self.add_agent(Agent(tasks=[queue_task],
showard170873e2009-01-07 00:22:26 +0000650 num_processes=len(queue_entries)))
mblighbb421852008-03-11 22:36:16 +0000651
652
jadmanski0afbb632008-06-06 21:10:57 +0000653 def _recover_processes(self):
showard170873e2009-01-07 00:22:26 +0000654 self._register_pidfiles()
655 _drone_manager.refresh()
656 self._recover_running_entries()
657 self._recover_aborting_entries()
658 self._requeue_other_active_entries()
659 self._recover_parsing_entries()
660 self._reverify_remaining_hosts()
661 # reinitialize drones after killing orphaned processes, since they can
662 # leave around files when they die
663 _drone_manager.execute_actions()
664 _drone_manager.reinitialize_drones()
mblighbb421852008-03-11 22:36:16 +0000665
showard170873e2009-01-07 00:22:26 +0000666
667 def _register_pidfiles(self):
668 # during recovery we may need to read pidfiles for both running and
669 # parsing entries
670 queue_entries = HostQueueEntry.fetch(
671 where="status IN ('Running', 'Parsing')")
jadmanski0afbb632008-06-06 21:10:57 +0000672 for queue_entry in queue_entries:
showard170873e2009-01-07 00:22:26 +0000673 pidfile_id = _drone_manager.get_pidfile_id_from(
674 queue_entry.execution_tag())
675 _drone_manager.register_pidfile(pidfile_id)
676
677
678 def _recover_running_entries(self):
679 orphans = _drone_manager.get_orphaned_autoserv_processes()
680
681 queue_entries = HostQueueEntry.fetch(where="status = 'Running'")
682 requeue_entries = []
683 for queue_entry in queue_entries:
684 if self.get_agents_for_entry(queue_entry):
jadmanski0afbb632008-06-06 21:10:57 +0000685 # synchronous job we've already recovered
686 continue
showard170873e2009-01-07 00:22:26 +0000687 execution_tag = queue_entry.execution_tag()
688 run_monitor = PidfileRunMonitor()
689 run_monitor.attach_to_existing_process(execution_tag)
690 if not run_monitor.has_process():
691 # autoserv apparently never got run, so let it get requeued
692 continue
showarde788ea62008-11-17 21:02:47 +0000693 queue_entries = queue_entry.job.get_group_entries(queue_entry)
showardb18134f2009-03-20 20:52:18 +0000694 logging.info('Recovering %s (process %s)',
695 (', '.join(str(entry) for entry in queue_entries),
696 run_monitor.get_process()))
showard2bab8f42008-11-12 18:15:22 +0000697 self._recover_queue_entries(queue_entries, run_monitor)
showard170873e2009-01-07 00:22:26 +0000698 orphans.pop(execution_tag, None)
mbligh90a549d2008-03-25 23:52:34 +0000699
jadmanski0afbb632008-06-06 21:10:57 +0000700 # now kill any remaining autoserv processes
showard170873e2009-01-07 00:22:26 +0000701 for process in orphans.itervalues():
showardb18134f2009-03-20 20:52:18 +0000702 logging.info('Killing orphan %s', process)
showard170873e2009-01-07 00:22:26 +0000703 _drone_manager.kill_process(process)
jadmanski0afbb632008-06-06 21:10:57 +0000704
showard170873e2009-01-07 00:22:26 +0000705
706 def _recover_aborting_entries(self):
707 queue_entries = HostQueueEntry.fetch(
708 where='status IN ("Abort", "Aborting")')
jadmanski0afbb632008-06-06 21:10:57 +0000709 for queue_entry in queue_entries:
showardb18134f2009-03-20 20:52:18 +0000710 logging.info('Recovering aborting QE %s', queue_entry)
showard170873e2009-01-07 00:22:26 +0000711 agent = queue_entry.abort(self)
jadmanski0afbb632008-06-06 21:10:57 +0000712
showard97aed502008-11-04 02:01:24 +0000713
showard170873e2009-01-07 00:22:26 +0000714 def _requeue_other_active_entries(self):
715 queue_entries = HostQueueEntry.fetch(
716 where='active AND NOT complete AND status != "Pending"')
717 for queue_entry in queue_entries:
718 if self.get_agents_for_entry(queue_entry):
719 # entry has already been recovered
720 continue
showardb18134f2009-03-20 20:52:18 +0000721 logging.info('Requeuing active QE %s (status=%s)', queue_entry,
722 queue_entry.status)
showard170873e2009-01-07 00:22:26 +0000723 if queue_entry.host:
724 tasks = queue_entry.host.reverify_tasks()
725 self.add_agent(Agent(tasks))
726 agent = queue_entry.requeue()
727
728
729 def _reverify_remaining_hosts(self):
showard45ae8192008-11-05 19:32:53 +0000730 # reverify hosts that were in the middle of verify, repair or cleanup
jadmanski0afbb632008-06-06 21:10:57 +0000731 self._reverify_hosts_where("""(status = 'Repairing' OR
732 status = 'Verifying' OR
showard170873e2009-01-07 00:22:26 +0000733 status = 'Cleaning')""")
jadmanski0afbb632008-06-06 21:10:57 +0000734
showard170873e2009-01-07 00:22:26 +0000735 # recover "Running" hosts with no active queue entries, although this
736 # should never happen
737 message = ('Recovering running host %s - this probably indicates a '
738 'scheduler bug')
jadmanski0afbb632008-06-06 21:10:57 +0000739 self._reverify_hosts_where("""status = 'Running' AND
740 id NOT IN (SELECT host_id
741 FROM host_queue_entries
742 WHERE active)""",
743 print_message=message)
mblighbb421852008-03-11 22:36:16 +0000744
745
jadmanski0afbb632008-06-06 21:10:57 +0000746 def _reverify_hosts_where(self, where,
showard170873e2009-01-07 00:22:26 +0000747 print_message='Reverifying host %s'):
748 full_where='locked = 0 AND invalid = 0 AND ' + where
749 for host in Host.fetch(where=full_where):
750 if self.host_has_agent(host):
751 # host has already been recovered in some way
jadmanski0afbb632008-06-06 21:10:57 +0000752 continue
showard170873e2009-01-07 00:22:26 +0000753 if print_message:
showardb18134f2009-03-20 20:52:18 +0000754 logging.info(print_message, host.hostname)
showard170873e2009-01-07 00:22:26 +0000755 tasks = host.reverify_tasks()
756 self.add_agent(Agent(tasks))
mbligh36768f02008-02-22 18:28:33 +0000757
758
showard97aed502008-11-04 02:01:24 +0000759 def _recover_parsing_entries(self):
showard2bab8f42008-11-12 18:15:22 +0000760 recovered_entry_ids = set()
showard97aed502008-11-04 02:01:24 +0000761 for entry in HostQueueEntry.fetch(where='status = "Parsing"'):
showard2bab8f42008-11-12 18:15:22 +0000762 if entry.id in recovered_entry_ids:
763 continue
764 queue_entries = entry.job.get_group_entries(entry)
showard170873e2009-01-07 00:22:26 +0000765 recovered_entry_ids = recovered_entry_ids.union(
766 entry.id for entry in queue_entries)
showardb18134f2009-03-20 20:52:18 +0000767 logging.info('Recovering parsing entries %s',
768 (', '.join(str(entry) for entry in queue_entries)))
showard97aed502008-11-04 02:01:24 +0000769
770 reparse_task = FinalReparseTask(queue_entries)
showard170873e2009-01-07 00:22:26 +0000771 self.add_agent(Agent([reparse_task], num_processes=0))
showard97aed502008-11-04 02:01:24 +0000772
773
jadmanski0afbb632008-06-06 21:10:57 +0000774 def _recover_hosts(self):
775 # recover "Repair Failed" hosts
776 message = 'Reverifying dead host %s'
777 self._reverify_hosts_where("status = 'Repair Failed'",
778 print_message=message)
mbligh62ba2ed2008-04-30 17:09:25 +0000779
780
showard3bb499f2008-07-03 19:42:20 +0000781 def _abort_timed_out_jobs(self):
782 """
783 Aborts all jobs that have timed out and not completed
784 """
showarda3ab0d52008-11-03 19:03:47 +0000785 query = models.Job.objects.filter(hostqueueentry__complete=False).extra(
786 where=['created_on + INTERVAL timeout HOUR < NOW()'])
787 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000788 logging.warning('Aborting job %d due to job timeout', job.id)
showarda3ab0d52008-11-03 19:03:47 +0000789 job.abort(None)
showard3bb499f2008-07-03 19:42:20 +0000790
791
showard98863972008-10-29 21:14:56 +0000792 def _abort_jobs_past_synch_start_timeout(self):
793 """
794 Abort synchronous jobs that are past the start timeout (from global
795 config) and are holding a machine that's in everyone.
796 """
797 timeout_delta = datetime.timedelta(
showardd1ee1dd2009-01-07 21:33:08 +0000798 minutes=scheduler_config.config.synch_job_start_timeout_minutes)
showard98863972008-10-29 21:14:56 +0000799 timeout_start = datetime.datetime.now() - timeout_delta
800 query = models.Job.objects.filter(
showard98863972008-10-29 21:14:56 +0000801 created_on__lt=timeout_start,
802 hostqueueentry__status='Pending',
showardd9ac4452009-02-07 02:04:37 +0000803 hostqueueentry__host__aclgroup__name='Everyone')
showard98863972008-10-29 21:14:56 +0000804 for job in query.distinct():
showardb18134f2009-03-20 20:52:18 +0000805 logging.warning('Aborting job %d due to start timeout', job.id)
showardff059d72008-12-03 18:18:53 +0000806 entries_to_abort = job.hostqueueentry_set.exclude(
807 status=models.HostQueueEntry.Status.RUNNING)
808 for queue_entry in entries_to_abort:
809 queue_entry.abort(None)
showard98863972008-10-29 21:14:56 +0000810
811
jadmanski0afbb632008-06-06 21:10:57 +0000812 def _clear_inactive_blocks(self):
813 """
814 Clear out blocks for all completed jobs.
815 """
816 # this would be simpler using NOT IN (subquery), but MySQL
817 # treats all IN subqueries as dependent, so this optimizes much
818 # better
819 _db.execute("""
820 DELETE ihq FROM ineligible_host_queues ihq
showard4eaaf522008-06-06 22:28:07 +0000821 LEFT JOIN (SELECT DISTINCT job_id FROM host_queue_entries
jadmanski0afbb632008-06-06 21:10:57 +0000822 WHERE NOT complete) hqe
823 USING (job_id) WHERE hqe.job_id IS NULL""")
showard04c82c52008-05-29 19:38:12 +0000824
825
showardb95b1bd2008-08-15 18:11:04 +0000826 def _get_pending_queue_entries(self):
showard63a34772008-08-18 19:32:50 +0000827 # prioritize by job priority, then non-metahost over metahost, then FIFO
828 return list(HostQueueEntry.fetch(
showard25cbdbd2009-02-17 20:57:21 +0000829 joins='INNER JOIN jobs ON (job_id=jobs.id)',
showardac9ce222008-12-03 18:19:44 +0000830 where='NOT complete AND NOT active AND status="Queued"',
showard25cbdbd2009-02-17 20:57:21 +0000831 order_by='jobs.priority DESC, meta_host, job_id'))
mbligh36768f02008-02-22 18:28:33 +0000832
833
showard89f84db2009-03-12 20:39:13 +0000834 def _refresh_pending_queue_entries(self):
835 """
836 Lookup the pending HostQueueEntries and call our HostScheduler
837 refresh() method given that list. Return the list.
838
839 @returns A list of pending HostQueueEntries sorted in priority order.
840 """
showard63a34772008-08-18 19:32:50 +0000841 queue_entries = self._get_pending_queue_entries()
842 if not queue_entries:
showard89f84db2009-03-12 20:39:13 +0000843 return []
showardb95b1bd2008-08-15 18:11:04 +0000844
showard63a34772008-08-18 19:32:50 +0000845 self._host_scheduler.refresh(queue_entries)
showardb95b1bd2008-08-15 18:11:04 +0000846
showard89f84db2009-03-12 20:39:13 +0000847 return queue_entries
848
849
850 def _schedule_atomic_group(self, queue_entry):
851 """
852 Schedule the given queue_entry on an atomic group of hosts.
853
854 Returns immediately if there are insufficient available hosts.
855
856 Creates new HostQueueEntries based off of queue_entry for the
857 scheduled hosts and starts them all running.
858 """
859 # This is a virtual host queue entry representing an entire
860 # atomic group, find a group and schedule their hosts.
861 group_hosts = self._host_scheduler.find_eligible_atomic_group(
862 queue_entry)
863 if not group_hosts:
864 return
865 # The first assigned host uses the original HostQueueEntry
866 group_queue_entries = [queue_entry]
867 for assigned_host in group_hosts[1:]:
868 # Create a new HQE for every additional assigned_host.
869 new_hqe = HostQueueEntry.clone(queue_entry)
870 new_hqe.save()
871 group_queue_entries.append(new_hqe)
872 assert len(group_queue_entries) == len(group_hosts)
873 for queue_entry, host in itertools.izip(group_queue_entries,
874 group_hosts):
875 self._run_queue_entry(queue_entry, host)
876
877
878 def _schedule_new_jobs(self):
879 queue_entries = self._refresh_pending_queue_entries()
880 if not queue_entries:
881 return
882
showard63a34772008-08-18 19:32:50 +0000883 for queue_entry in queue_entries:
showard89f84db2009-03-12 20:39:13 +0000884 if (queue_entry.atomic_group_id is None or
885 queue_entry.host_id is not None):
886 assigned_host = self._host_scheduler.find_eligible_host(
887 queue_entry)
888 if assigned_host:
889 self._run_queue_entry(queue_entry, assigned_host)
890 else:
891 self._schedule_atomic_group(queue_entry)
showardb95b1bd2008-08-15 18:11:04 +0000892
893
894 def _run_queue_entry(self, queue_entry, host):
895 agent = queue_entry.run(assigned_host=host)
showard170873e2009-01-07 00:22:26 +0000896 # in some cases (synchronous jobs with run_verify=False), agent may be
897 # None
showard9976ce92008-10-15 20:28:13 +0000898 if agent:
899 self.add_agent(agent)
mblighd5c95802008-03-05 00:33:46 +0000900
901
jadmanski0afbb632008-06-06 21:10:57 +0000902 def _find_aborting(self):
jadmanski0afbb632008-06-06 21:10:57 +0000903 for entry in queue_entries_to_abort():
showard170873e2009-01-07 00:22:26 +0000904 agents_to_abort = list(self.get_agents_for_entry(entry))
showard1be97432008-10-17 15:30:45 +0000905 for agent in agents_to_abort:
906 self.remove_agent(agent)
907
showard170873e2009-01-07 00:22:26 +0000908 entry.abort(self, agents_to_abort)
jadmanski0afbb632008-06-06 21:10:57 +0000909
910
showard324bf812009-01-20 23:23:38 +0000911 def _can_start_agent(self, agent, num_started_this_cycle,
912 have_reached_limit):
showard4c5374f2008-09-04 17:02:56 +0000913 # always allow zero-process agents to run
914 if agent.num_processes == 0:
915 return True
916 # don't allow any nonzero-process agents to run after we've reached a
917 # limit (this avoids starvation of many-process agents)
918 if have_reached_limit:
919 return False
920 # total process throttling
showard324bf812009-01-20 23:23:38 +0000921 if agent.num_processes > _drone_manager.max_runnable_processes():
showard4c5374f2008-09-04 17:02:56 +0000922 return False
923 # if a single agent exceeds the per-cycle throttling, still allow it to
924 # run when it's the first agent in the cycle
925 if num_started_this_cycle == 0:
926 return True
927 # per-cycle throttling
928 if (num_started_this_cycle + agent.num_processes >
showardd1ee1dd2009-01-07 21:33:08 +0000929 scheduler_config.config.max_processes_started_per_cycle):
showard4c5374f2008-09-04 17:02:56 +0000930 return False
931 return True
932
933
jadmanski0afbb632008-06-06 21:10:57 +0000934 def _handle_agents(self):
jadmanski0afbb632008-06-06 21:10:57 +0000935 num_started_this_cycle = 0
showard4c5374f2008-09-04 17:02:56 +0000936 have_reached_limit = False
937 # iterate over copy, so we can remove agents during iteration
938 for agent in list(self._agents):
939 if agent.is_done():
showardb18134f2009-03-20 20:52:18 +0000940 logging.info("agent finished")
showard170873e2009-01-07 00:22:26 +0000941 self.remove_agent(agent)
showard4c5374f2008-09-04 17:02:56 +0000942 continue
943 if not agent.is_running():
showard324bf812009-01-20 23:23:38 +0000944 if not self._can_start_agent(agent, num_started_this_cycle,
showard4c5374f2008-09-04 17:02:56 +0000945 have_reached_limit):
946 have_reached_limit = True
947 continue
showard4c5374f2008-09-04 17:02:56 +0000948 num_started_this_cycle += agent.num_processes
949 agent.tick()
showardb18134f2009-03-20 20:52:18 +0000950 logging.info('%d running processes',
951 _drone_manager.total_running_processes())
mbligh36768f02008-02-22 18:28:33 +0000952
953
showardfa8629c2008-11-04 16:51:23 +0000954 def _check_for_db_inconsistencies(self):
955 query = models.HostQueueEntry.objects.filter(active=True, complete=True)
956 if query.count() != 0:
957 subject = ('%d queue entries found with active=complete=1'
958 % query.count())
959 message = '\n'.join(str(entry.get_object_dict())
960 for entry in query[:50])
961 if len(query) > 50:
962 message += '\n(truncated)\n'
963
showardb18134f2009-03-20 20:52:18 +0000964 logging.error(subject)
showard170873e2009-01-07 00:22:26 +0000965 email_manager.manager.enqueue_notify_email(subject, message)
showardfa8629c2008-11-04 16:51:23 +0000966
967
showard170873e2009-01-07 00:22:26 +0000968class PidfileRunMonitor(object):
969 """
970 Client must call either run() to start a new process or
971 attach_to_existing_process().
972 """
mbligh36768f02008-02-22 18:28:33 +0000973
showard170873e2009-01-07 00:22:26 +0000974 class _PidfileException(Exception):
975 """
976 Raised when there's some unexpected behavior with the pid file, but only
977 used internally (never allowed to escape this class).
978 """
mbligh36768f02008-02-22 18:28:33 +0000979
980
showard170873e2009-01-07 00:22:26 +0000981 def __init__(self):
showard35162b02009-03-03 02:17:30 +0000982 self.lost_process = False
showard170873e2009-01-07 00:22:26 +0000983 self._start_time = None
984 self.pidfile_id = None
985 self._state = drone_manager.PidfileContents()
showard2bab8f42008-11-12 18:15:22 +0000986
987
showard170873e2009-01-07 00:22:26 +0000988 def _add_nice_command(self, command, nice_level):
989 if not nice_level:
990 return command
991 return ['nice', '-n', str(nice_level)] + command
992
993
994 def _set_start_time(self):
995 self._start_time = time.time()
996
997
998 def run(self, command, working_directory, nice_level=None, log_file=None,
999 pidfile_name=None, paired_with_pidfile=None):
1000 assert command is not None
1001 if nice_level is not None:
1002 command = ['nice', '-n', str(nice_level)] + command
1003 self._set_start_time()
1004 self.pidfile_id = _drone_manager.execute_command(
1005 command, working_directory, log_file=log_file,
1006 pidfile_name=pidfile_name, paired_with_pidfile=paired_with_pidfile)
1007
1008
1009 def attach_to_existing_process(self, execution_tag):
1010 self._set_start_time()
1011 self.pidfile_id = _drone_manager.get_pidfile_id_from(execution_tag)
1012 _drone_manager.register_pidfile(self.pidfile_id)
mblighbb421852008-03-11 22:36:16 +00001013
1014
jadmanski0afbb632008-06-06 21:10:57 +00001015 def kill(self):
showard170873e2009-01-07 00:22:26 +00001016 if self.has_process():
1017 _drone_manager.kill_process(self.get_process())
mblighbb421852008-03-11 22:36:16 +00001018
mbligh36768f02008-02-22 18:28:33 +00001019
showard170873e2009-01-07 00:22:26 +00001020 def has_process(self):
showard21baa452008-10-21 00:08:39 +00001021 self._get_pidfile_info()
showard170873e2009-01-07 00:22:26 +00001022 return self._state.process is not None
showard21baa452008-10-21 00:08:39 +00001023
1024
showard170873e2009-01-07 00:22:26 +00001025 def get_process(self):
showard21baa452008-10-21 00:08:39 +00001026 self._get_pidfile_info()
showard35162b02009-03-03 02:17:30 +00001027 assert self._state.process is not None
showard170873e2009-01-07 00:22:26 +00001028 return self._state.process
mblighbb421852008-03-11 22:36:16 +00001029
1030
showard170873e2009-01-07 00:22:26 +00001031 def _read_pidfile(self, use_second_read=False):
1032 assert self.pidfile_id is not None, (
1033 'You must call run() or attach_to_existing_process()')
1034 contents = _drone_manager.get_pidfile_contents(
1035 self.pidfile_id, use_second_read=use_second_read)
1036 if contents.is_invalid():
1037 self._state = drone_manager.PidfileContents()
1038 raise self._PidfileException(contents)
1039 self._state = contents
mbligh90a549d2008-03-25 23:52:34 +00001040
1041
showard21baa452008-10-21 00:08:39 +00001042 def _handle_pidfile_error(self, error, message=''):
showard170873e2009-01-07 00:22:26 +00001043 message = error + '\nProcess: %s\nPidfile: %s\n%s' % (
1044 self._state.process, self.pidfile_id, message)
showardb18134f2009-03-20 20:52:18 +00001045 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001046 email_manager.manager.enqueue_notify_email(error, message)
showard35162b02009-03-03 02:17:30 +00001047 self.on_lost_process(self._state.process)
showard21baa452008-10-21 00:08:39 +00001048
1049
1050 def _get_pidfile_info_helper(self):
showard35162b02009-03-03 02:17:30 +00001051 if self.lost_process:
showard21baa452008-10-21 00:08:39 +00001052 return
mblighbb421852008-03-11 22:36:16 +00001053
showard21baa452008-10-21 00:08:39 +00001054 self._read_pidfile()
mblighbb421852008-03-11 22:36:16 +00001055
showard170873e2009-01-07 00:22:26 +00001056 if self._state.process is None:
1057 self._handle_no_process()
showard21baa452008-10-21 00:08:39 +00001058 return
mbligh90a549d2008-03-25 23:52:34 +00001059
showard21baa452008-10-21 00:08:39 +00001060 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001061 # double check whether or not autoserv is running
showard170873e2009-01-07 00:22:26 +00001062 if _drone_manager.is_process_running(self._state.process):
showard21baa452008-10-21 00:08:39 +00001063 return
mbligh90a549d2008-03-25 23:52:34 +00001064
showard170873e2009-01-07 00:22:26 +00001065 # pid but no running process - maybe process *just* exited
1066 self._read_pidfile(use_second_read=True)
showard21baa452008-10-21 00:08:39 +00001067 if self._state.exit_status is None:
jadmanski0afbb632008-06-06 21:10:57 +00001068 # autoserv exited without writing an exit code
1069 # to the pidfile
showard21baa452008-10-21 00:08:39 +00001070 self._handle_pidfile_error(
1071 'autoserv died without writing exit code')
mblighbb421852008-03-11 22:36:16 +00001072
showard21baa452008-10-21 00:08:39 +00001073
1074 def _get_pidfile_info(self):
1075 """\
1076 After completion, self._state will contain:
1077 pid=None, exit_status=None if autoserv has not yet run
1078 pid!=None, exit_status=None if autoserv is running
1079 pid!=None, exit_status!=None if autoserv has completed
1080 """
1081 try:
1082 self._get_pidfile_info_helper()
showard170873e2009-01-07 00:22:26 +00001083 except self._PidfileException, exc:
showard21baa452008-10-21 00:08:39 +00001084 self._handle_pidfile_error('Pidfile error', traceback.format_exc())
mblighbb421852008-03-11 22:36:16 +00001085
1086
showard170873e2009-01-07 00:22:26 +00001087 def _handle_no_process(self):
jadmanski0afbb632008-06-06 21:10:57 +00001088 """\
1089 Called when no pidfile is found or no pid is in the pidfile.
1090 """
showard170873e2009-01-07 00:22:26 +00001091 message = 'No pid found at %s' % self.pidfile_id
showardb18134f2009-03-20 20:52:18 +00001092 logging.info(message)
showard170873e2009-01-07 00:22:26 +00001093 if time.time() - self._start_time > PIDFILE_TIMEOUT:
1094 email_manager.manager.enqueue_notify_email(
jadmanski0afbb632008-06-06 21:10:57 +00001095 'Process has failed to write pidfile', message)
showard35162b02009-03-03 02:17:30 +00001096 self.on_lost_process()
mbligh90a549d2008-03-25 23:52:34 +00001097
1098
showard35162b02009-03-03 02:17:30 +00001099 def on_lost_process(self, process=None):
jadmanski0afbb632008-06-06 21:10:57 +00001100 """\
1101 Called when autoserv has exited without writing an exit status,
1102 or we've timed out waiting for autoserv to write a pid to the
1103 pidfile. In either case, we just return failure and the caller
1104 should signal some kind of warning.
mbligh90a549d2008-03-25 23:52:34 +00001105
showard170873e2009-01-07 00:22:26 +00001106 process is unimportant here, as it shouldn't be used by anyone.
jadmanski0afbb632008-06-06 21:10:57 +00001107 """
1108 self.lost_process = True
showard170873e2009-01-07 00:22:26 +00001109 self._state.process = process
showard21baa452008-10-21 00:08:39 +00001110 self._state.exit_status = 1
1111 self._state.num_tests_failed = 0
mbligh90a549d2008-03-25 23:52:34 +00001112
1113
jadmanski0afbb632008-06-06 21:10:57 +00001114 def exit_code(self):
showard21baa452008-10-21 00:08:39 +00001115 self._get_pidfile_info()
1116 return self._state.exit_status
1117
1118
1119 def num_tests_failed(self):
1120 self._get_pidfile_info()
1121 assert self._state.num_tests_failed is not None
1122 return self._state.num_tests_failed
mblighbb421852008-03-11 22:36:16 +00001123
1124
mbligh36768f02008-02-22 18:28:33 +00001125class Agent(object):
showard170873e2009-01-07 00:22:26 +00001126 def __init__(self, tasks, num_processes=1):
jadmanski0afbb632008-06-06 21:10:57 +00001127 self.active_task = None
1128 self.queue = Queue.Queue(0)
1129 self.dispatcher = None
showard4c5374f2008-09-04 17:02:56 +00001130 self.num_processes = num_processes
jadmanski0afbb632008-06-06 21:10:57 +00001131
showard170873e2009-01-07 00:22:26 +00001132 self.queue_entry_ids = self._union_ids(task.queue_entry_ids
1133 for task in tasks)
1134 self.host_ids = self._union_ids(task.host_ids for task in tasks)
1135
jadmanski0afbb632008-06-06 21:10:57 +00001136 for task in tasks:
1137 self.add_task(task)
mbligh36768f02008-02-22 18:28:33 +00001138
1139
showard170873e2009-01-07 00:22:26 +00001140 def _union_ids(self, id_lists):
1141 return set(itertools.chain(*id_lists))
1142
1143
jadmanski0afbb632008-06-06 21:10:57 +00001144 def add_task(self, task):
1145 self.queue.put_nowait(task)
1146 task.agent = self
mbligh36768f02008-02-22 18:28:33 +00001147
1148
jadmanski0afbb632008-06-06 21:10:57 +00001149 def tick(self):
showard21baa452008-10-21 00:08:39 +00001150 while not self.is_done():
1151 if self.active_task and not self.active_task.is_done():
1152 self.active_task.poll()
1153 if not self.active_task.is_done():
1154 return
1155 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001156
1157
jadmanski0afbb632008-06-06 21:10:57 +00001158 def _next_task(self):
showardb18134f2009-03-20 20:52:18 +00001159 logging.info("agent picking task")
jadmanski0afbb632008-06-06 21:10:57 +00001160 if self.active_task:
1161 assert self.active_task.is_done()
mbligh36768f02008-02-22 18:28:33 +00001162
jadmanski0afbb632008-06-06 21:10:57 +00001163 if not self.active_task.success:
1164 self.on_task_failure()
mblighe2586682008-02-29 22:45:46 +00001165
jadmanski0afbb632008-06-06 21:10:57 +00001166 self.active_task = None
1167 if not self.is_done():
1168 self.active_task = self.queue.get_nowait()
1169 if self.active_task:
1170 self.active_task.start()
mbligh36768f02008-02-22 18:28:33 +00001171
1172
jadmanski0afbb632008-06-06 21:10:57 +00001173 def on_task_failure(self):
1174 self.queue = Queue.Queue(0)
1175 for task in self.active_task.failure_tasks:
1176 self.add_task(task)
mbligh16c722d2008-03-05 00:58:44 +00001177
mblighe2586682008-02-29 22:45:46 +00001178
showard4c5374f2008-09-04 17:02:56 +00001179 def is_running(self):
jadmanski0afbb632008-06-06 21:10:57 +00001180 return self.active_task is not None
showardec113162008-05-08 00:52:49 +00001181
1182
jadmanski0afbb632008-06-06 21:10:57 +00001183 def is_done(self):
mblighd876f452008-12-03 15:09:17 +00001184 return self.active_task is None and self.queue.empty()
mbligh36768f02008-02-22 18:28:33 +00001185
1186
jadmanski0afbb632008-06-06 21:10:57 +00001187 def start(self):
1188 assert self.dispatcher
mbligh36768f02008-02-22 18:28:33 +00001189
jadmanski0afbb632008-06-06 21:10:57 +00001190 self._next_task()
mbligh36768f02008-02-22 18:28:33 +00001191
jadmanski0afbb632008-06-06 21:10:57 +00001192
mbligh36768f02008-02-22 18:28:33 +00001193class AgentTask(object):
showard170873e2009-01-07 00:22:26 +00001194 def __init__(self, cmd, working_directory=None, failure_tasks=[]):
jadmanski0afbb632008-06-06 21:10:57 +00001195 self.done = False
1196 self.failure_tasks = failure_tasks
1197 self.started = False
1198 self.cmd = cmd
showard170873e2009-01-07 00:22:26 +00001199 self._working_directory = working_directory
jadmanski0afbb632008-06-06 21:10:57 +00001200 self.task = None
1201 self.agent = None
1202 self.monitor = None
1203 self.success = None
showard170873e2009-01-07 00:22:26 +00001204 self.queue_entry_ids = []
1205 self.host_ids = []
1206 self.log_file = None
1207
1208
1209 def _set_ids(self, host=None, queue_entries=None):
1210 if queue_entries and queue_entries != [None]:
1211 self.host_ids = [entry.host.id for entry in queue_entries]
1212 self.queue_entry_ids = [entry.id for entry in queue_entries]
1213 else:
1214 assert host
1215 self.host_ids = [host.id]
mbligh36768f02008-02-22 18:28:33 +00001216
1217
jadmanski0afbb632008-06-06 21:10:57 +00001218 def poll(self):
jadmanski0afbb632008-06-06 21:10:57 +00001219 if self.monitor:
1220 self.tick(self.monitor.exit_code())
1221 else:
1222 self.finished(False)
mbligh36768f02008-02-22 18:28:33 +00001223
1224
jadmanski0afbb632008-06-06 21:10:57 +00001225 def tick(self, exit_code):
showard170873e2009-01-07 00:22:26 +00001226 if exit_code is None:
jadmanski0afbb632008-06-06 21:10:57 +00001227 return
jadmanski0afbb632008-06-06 21:10:57 +00001228 if exit_code == 0:
1229 success = True
1230 else:
1231 success = False
mbligh36768f02008-02-22 18:28:33 +00001232
jadmanski0afbb632008-06-06 21:10:57 +00001233 self.finished(success)
mbligh36768f02008-02-22 18:28:33 +00001234
1235
jadmanski0afbb632008-06-06 21:10:57 +00001236 def is_done(self):
1237 return self.done
mbligh36768f02008-02-22 18:28:33 +00001238
1239
jadmanski0afbb632008-06-06 21:10:57 +00001240 def finished(self, success):
1241 self.done = True
1242 self.success = success
1243 self.epilog()
mbligh36768f02008-02-22 18:28:33 +00001244
1245
jadmanski0afbb632008-06-06 21:10:57 +00001246 def prolog(self):
1247 pass
mblighd64e5702008-04-04 21:39:28 +00001248
1249
jadmanski0afbb632008-06-06 21:10:57 +00001250 def create_temp_resultsdir(self, suffix=''):
showard170873e2009-01-07 00:22:26 +00001251 self.temp_results_dir = _drone_manager.get_temporary_path('agent_task')
mblighd64e5702008-04-04 21:39:28 +00001252
mbligh36768f02008-02-22 18:28:33 +00001253
jadmanski0afbb632008-06-06 21:10:57 +00001254 def cleanup(self):
showard170873e2009-01-07 00:22:26 +00001255 if self.monitor and self.log_file:
1256 _drone_manager.copy_to_results_repository(
1257 self.monitor.get_process(), self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001258
1259
jadmanski0afbb632008-06-06 21:10:57 +00001260 def epilog(self):
1261 self.cleanup()
mbligh36768f02008-02-22 18:28:33 +00001262
1263
jadmanski0afbb632008-06-06 21:10:57 +00001264 def start(self):
1265 assert self.agent
1266
1267 if not self.started:
1268 self.prolog()
1269 self.run()
1270
1271 self.started = True
1272
1273
1274 def abort(self):
1275 if self.monitor:
1276 self.monitor.kill()
1277 self.done = True
1278 self.cleanup()
1279
1280
showard170873e2009-01-07 00:22:26 +00001281 def set_host_log_file(self, base_name, host):
1282 filename = '%s.%s' % (time.time(), base_name)
1283 self.log_file = os.path.join('hosts', host.hostname, filename)
1284
1285
showardde634ee2009-01-30 01:44:24 +00001286 def _get_consistent_execution_tag(self, queue_entries):
1287 first_execution_tag = queue_entries[0].execution_tag()
1288 for queue_entry in queue_entries[1:]:
1289 assert queue_entry.execution_tag() == first_execution_tag, (
1290 '%s (%s) != %s (%s)' % (queue_entry.execution_tag(),
1291 queue_entry,
1292 first_execution_tag,
1293 queue_entries[0]))
1294 return first_execution_tag
1295
1296
showard678df4f2009-02-04 21:36:39 +00001297 def _copy_and_parse_results(self, queue_entries):
showardde634ee2009-01-30 01:44:24 +00001298 assert len(queue_entries) > 0
1299 assert self.monitor
1300 execution_tag = self._get_consistent_execution_tag(queue_entries)
showard678df4f2009-02-04 21:36:39 +00001301 results_path = execution_tag + '/'
1302 _drone_manager.copy_to_results_repository(self.monitor.get_process(),
1303 results_path)
showardde634ee2009-01-30 01:44:24 +00001304
1305 reparse_task = FinalReparseTask(queue_entries)
1306 self.agent.dispatcher.add_agent(Agent([reparse_task], num_processes=0))
1307
1308
jadmanski0afbb632008-06-06 21:10:57 +00001309 def run(self):
1310 if self.cmd:
showard170873e2009-01-07 00:22:26 +00001311 self.monitor = PidfileRunMonitor()
1312 self.monitor.run(self.cmd, self._working_directory,
1313 nice_level=AUTOSERV_NICE_LEVEL,
1314 log_file=self.log_file)
mbligh36768f02008-02-22 18:28:33 +00001315
1316
1317class RepairTask(AgentTask):
showarde788ea62008-11-17 21:02:47 +00001318 def __init__(self, host, queue_entry=None):
jadmanski0afbb632008-06-06 21:10:57 +00001319 """\
showard170873e2009-01-07 00:22:26 +00001320 queue_entry: queue entry to mark failed if this repair fails.
jadmanski0afbb632008-06-06 21:10:57 +00001321 """
jadmanskifb7cfb12008-07-09 14:13:21 +00001322 protection = host_protections.Protection.get_string(host.protection)
jadmanski542537f2008-07-24 14:14:56 +00001323 # normalize the protection name
1324 protection = host_protections.Protection.get_attr_name(protection)
showard170873e2009-01-07 00:22:26 +00001325
jadmanski0afbb632008-06-06 21:10:57 +00001326 self.host = host
showarde788ea62008-11-17 21:02:47 +00001327 self.queue_entry = queue_entry
showard170873e2009-01-07 00:22:26 +00001328 self._set_ids(host=host, queue_entries=[queue_entry])
1329
1330 self.create_temp_resultsdir('.repair')
1331 cmd = [_autoserv_path , '-p', '-R', '-m', host.hostname,
1332 '-r', _drone_manager.absolute_path(self.temp_results_dir),
1333 '--host-protection', protection]
1334 super(RepairTask, self).__init__(cmd, self.temp_results_dir)
1335
1336 self._set_ids(host=host, queue_entries=[queue_entry])
1337 self.set_host_log_file('repair', self.host)
mblighe2586682008-02-29 22:45:46 +00001338
mbligh36768f02008-02-22 18:28:33 +00001339
jadmanski0afbb632008-06-06 21:10:57 +00001340 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001341 logging.info("repair_task starting")
jadmanski0afbb632008-06-06 21:10:57 +00001342 self.host.set_status('Repairing')
showarde788ea62008-11-17 21:02:47 +00001343 if self.queue_entry:
1344 self.queue_entry.requeue()
mbligh36768f02008-02-22 18:28:33 +00001345
1346
showardde634ee2009-01-30 01:44:24 +00001347 def _fail_queue_entry(self):
1348 assert self.queue_entry
1349 self.queue_entry.set_execution_subdir()
showard678df4f2009-02-04 21:36:39 +00001350 # copy results logs into the normal place for job results
1351 _drone_manager.copy_results_on_drone(
1352 self.monitor.get_process(),
1353 source_path=self.temp_results_dir + '/',
1354 destination_path=self.queue_entry.execution_tag() + '/')
1355
1356 self._copy_and_parse_results([self.queue_entry])
showardde634ee2009-01-30 01:44:24 +00001357 self.queue_entry.handle_host_failure()
1358
1359
jadmanski0afbb632008-06-06 21:10:57 +00001360 def epilog(self):
1361 super(RepairTask, self).epilog()
1362 if self.success:
1363 self.host.set_status('Ready')
1364 else:
1365 self.host.set_status('Repair Failed')
showarde788ea62008-11-17 21:02:47 +00001366 if self.queue_entry and not self.queue_entry.meta_host:
showardde634ee2009-01-30 01:44:24 +00001367 self._fail_queue_entry()
mbligh36768f02008-02-22 18:28:33 +00001368
1369
showard8fe93b52008-11-18 17:53:22 +00001370class PreJobTask(AgentTask):
showard170873e2009-01-07 00:22:26 +00001371 def epilog(self):
1372 super(PreJobTask, self).epilog()
showard8fe93b52008-11-18 17:53:22 +00001373 should_copy_results = (self.queue_entry and not self.success
1374 and not self.queue_entry.meta_host)
1375 if should_copy_results:
1376 self.queue_entry.set_execution_subdir()
showard170873e2009-01-07 00:22:26 +00001377 destination = os.path.join(self.queue_entry.execution_tag(),
1378 os.path.basename(self.log_file))
1379 _drone_manager.copy_to_results_repository(
1380 self.monitor.get_process(), self.log_file,
1381 destination_path=destination)
showard8fe93b52008-11-18 17:53:22 +00001382
1383
1384class VerifyTask(PreJobTask):
showard9976ce92008-10-15 20:28:13 +00001385 def __init__(self, queue_entry=None, host=None):
jadmanski0afbb632008-06-06 21:10:57 +00001386 assert bool(queue_entry) != bool(host)
jadmanski0afbb632008-06-06 21:10:57 +00001387 self.host = host or queue_entry.host
1388 self.queue_entry = queue_entry
mbligh36768f02008-02-22 18:28:33 +00001389
jadmanski0afbb632008-06-06 21:10:57 +00001390 self.create_temp_resultsdir('.verify')
showard170873e2009-01-07 00:22:26 +00001391 cmd = [_autoserv_path, '-p', '-v', '-m', self.host.hostname, '-r',
1392 _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001393 failure_tasks = [RepairTask(self.host, queue_entry=queue_entry)]
showard170873e2009-01-07 00:22:26 +00001394 super(VerifyTask, self).__init__(cmd, self.temp_results_dir,
1395 failure_tasks=failure_tasks)
mblighe2586682008-02-29 22:45:46 +00001396
showard170873e2009-01-07 00:22:26 +00001397 self.set_host_log_file('verify', self.host)
1398 self._set_ids(host=host, queue_entries=[queue_entry])
mblighe2586682008-02-29 22:45:46 +00001399
1400
jadmanski0afbb632008-06-06 21:10:57 +00001401 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001402 super(VerifyTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001403 logging.info("starting verify on %s", self.host.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001404 if self.queue_entry:
1405 self.queue_entry.set_status('Verifying')
jadmanski0afbb632008-06-06 21:10:57 +00001406 self.host.set_status('Verifying')
mbligh36768f02008-02-22 18:28:33 +00001407
1408
jadmanski0afbb632008-06-06 21:10:57 +00001409 def epilog(self):
1410 super(VerifyTask, self).epilog()
mbligh36768f02008-02-22 18:28:33 +00001411
jadmanski0afbb632008-06-06 21:10:57 +00001412 if self.success:
1413 self.host.set_status('Ready')
mbligh36768f02008-02-22 18:28:33 +00001414
1415
mbligh36768f02008-02-22 18:28:33 +00001416class QueueTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001417 def __init__(self, job, queue_entries, cmd):
jadmanski0afbb632008-06-06 21:10:57 +00001418 self.job = job
1419 self.queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001420 super(QueueTask, self).__init__(cmd, self._execution_tag())
1421 self._set_ids(queue_entries=queue_entries)
mbligh36768f02008-02-22 18:28:33 +00001422
1423
showard170873e2009-01-07 00:22:26 +00001424 def _format_keyval(self, key, value):
1425 return '%s=%s' % (key, value)
mbligh36768f02008-02-22 18:28:33 +00001426
1427
showard73ec0442009-02-07 02:05:20 +00001428 def _keyval_path(self):
1429 return os.path.join(self._execution_tag(), 'keyval')
1430
1431
1432 def _write_keyvals_before_job_helper(self, keyval_dict, keyval_path):
1433 keyval_contents = '\n'.join(self._format_keyval(key, value)
1434 for key, value in keyval_dict.iteritems())
1435 # always end with a newline to allow additional keyvals to be written
1436 keyval_contents += '\n'
1437 _drone_manager.attach_file_to_execution(self._execution_tag(),
1438 keyval_contents,
1439 file_path=keyval_path)
1440
1441
1442 def _write_keyvals_before_job(self, keyval_dict):
1443 self._write_keyvals_before_job_helper(keyval_dict, self._keyval_path())
1444
1445
1446 def _write_keyval_after_job(self, field, value):
showard170873e2009-01-07 00:22:26 +00001447 assert self.monitor and self.monitor.has_process()
showard170873e2009-01-07 00:22:26 +00001448 _drone_manager.write_lines_to_file(
showard73ec0442009-02-07 02:05:20 +00001449 self._keyval_path(), [self._format_keyval(field, value)],
showard35162b02009-03-03 02:17:30 +00001450 paired_with_process=self.monitor.get_process())
showardd8e548a2008-09-09 03:04:57 +00001451
1452
showard170873e2009-01-07 00:22:26 +00001453 def _write_host_keyvals(self, host):
1454 keyval_path = os.path.join(self._execution_tag(), 'host_keyvals',
1455 host.hostname)
1456 platform, all_labels = host.platform_and_labels()
showard73ec0442009-02-07 02:05:20 +00001457 keyval_dict = dict(platform=platform, labels=','.join(all_labels))
1458 self._write_keyvals_before_job_helper(keyval_dict, keyval_path)
showardd8e548a2008-09-09 03:04:57 +00001459
1460
showard170873e2009-01-07 00:22:26 +00001461 def _execution_tag(self):
1462 return self.queue_entries[0].execution_tag()
mblighbb421852008-03-11 22:36:16 +00001463
1464
jadmanski0afbb632008-06-06 21:10:57 +00001465 def prolog(self):
showard73ec0442009-02-07 02:05:20 +00001466 queued = int(time.mktime(self.job.created_on.timetuple()))
1467 self._write_keyvals_before_job({'job_queued': queued})
jadmanski0afbb632008-06-06 21:10:57 +00001468 for queue_entry in self.queue_entries:
showard170873e2009-01-07 00:22:26 +00001469 self._write_host_keyvals(queue_entry.host)
jadmanski0afbb632008-06-06 21:10:57 +00001470 queue_entry.set_status('Running')
1471 queue_entry.host.set_status('Running')
showard21baa452008-10-21 00:08:39 +00001472 queue_entry.host.update_field('dirty', 1)
showard2bab8f42008-11-12 18:15:22 +00001473 if self.job.synch_count == 1:
jadmanski0afbb632008-06-06 21:10:57 +00001474 assert len(self.queue_entries) == 1
1475 self.job.write_to_machines_file(self.queue_entries[0])
mbligh36768f02008-02-22 18:28:33 +00001476
1477
showard35162b02009-03-03 02:17:30 +00001478 def _write_lost_process_error_file(self):
1479 error_file_path = os.path.join(self._execution_tag(), 'job_failure')
1480 _drone_manager.write_lines_to_file(error_file_path,
1481 [_LOST_PROCESS_ERROR])
1482
1483
showard97aed502008-11-04 02:01:24 +00001484 def _finish_task(self, success):
showard35162b02009-03-03 02:17:30 +00001485 if self.monitor.has_process():
1486 self._write_keyval_after_job("job_finished", int(time.time()))
1487 self._copy_and_parse_results(self.queue_entries)
1488
1489 if self.monitor.lost_process:
1490 self._write_lost_process_error_file()
1491 for queue_entry in self.queue_entries:
1492 queue_entry.set_status(models.HostQueueEntry.Status.FAILED)
jadmanskif7fa2cc2008-10-01 14:13:23 +00001493
1494
showardcbd74612008-11-19 21:42:02 +00001495 def _write_status_comment(self, comment):
showard170873e2009-01-07 00:22:26 +00001496 _drone_manager.write_lines_to_file(
1497 os.path.join(self._execution_tag(), 'status.log'),
1498 ['INFO\t----\t----\t' + comment],
showard35162b02009-03-03 02:17:30 +00001499 paired_with_process=self.monitor.get_process())
showardcbd74612008-11-19 21:42:02 +00001500
1501
jadmanskif7fa2cc2008-10-01 14:13:23 +00001502 def _log_abort(self):
showard170873e2009-01-07 00:22:26 +00001503 if not self.monitor or not self.monitor.has_process():
1504 return
1505
jadmanskif7fa2cc2008-10-01 14:13:23 +00001506 # build up sets of all the aborted_by and aborted_on values
1507 aborted_by, aborted_on = set(), set()
1508 for queue_entry in self.queue_entries:
1509 if queue_entry.aborted_by:
1510 aborted_by.add(queue_entry.aborted_by)
1511 t = int(time.mktime(queue_entry.aborted_on.timetuple()))
1512 aborted_on.add(t)
1513
1514 # extract some actual, unique aborted by value and write it out
1515 assert len(aborted_by) <= 1
1516 if len(aborted_by) == 1:
showardcbd74612008-11-19 21:42:02 +00001517 aborted_by_value = aborted_by.pop()
1518 aborted_on_value = max(aborted_on)
1519 else:
1520 aborted_by_value = 'autotest_system'
1521 aborted_on_value = int(time.time())
showard170873e2009-01-07 00:22:26 +00001522
showarda0382352009-02-11 23:36:43 +00001523 self._write_keyval_after_job("aborted_by", aborted_by_value)
1524 self._write_keyval_after_job("aborted_on", aborted_on_value)
showard170873e2009-01-07 00:22:26 +00001525
showardcbd74612008-11-19 21:42:02 +00001526 aborted_on_string = str(datetime.datetime.fromtimestamp(
1527 aborted_on_value))
1528 self._write_status_comment('Job aborted by %s on %s' %
1529 (aborted_by_value, aborted_on_string))
jadmanskic2ac77f2008-05-16 21:44:04 +00001530
1531
jadmanski0afbb632008-06-06 21:10:57 +00001532 def abort(self):
1533 super(QueueTask, self).abort()
jadmanskif7fa2cc2008-10-01 14:13:23 +00001534 self._log_abort()
showard97aed502008-11-04 02:01:24 +00001535 self._finish_task(False)
jadmanskic2ac77f2008-05-16 21:44:04 +00001536
1537
showard21baa452008-10-21 00:08:39 +00001538 def _reboot_hosts(self):
1539 reboot_after = self.job.reboot_after
1540 do_reboot = False
showard0fc38302008-10-23 00:44:07 +00001541 if reboot_after == models.RebootAfter.ALWAYS:
showard21baa452008-10-21 00:08:39 +00001542 do_reboot = True
showard0fc38302008-10-23 00:44:07 +00001543 elif reboot_after == models.RebootAfter.IF_ALL_TESTS_PASSED:
showard21baa452008-10-21 00:08:39 +00001544 num_tests_failed = self.monitor.num_tests_failed()
1545 do_reboot = (self.success and num_tests_failed == 0)
1546
showard8ebca792008-11-04 21:54:22 +00001547 for queue_entry in self.queue_entries:
1548 if do_reboot:
showard45ae8192008-11-05 19:32:53 +00001549 # don't pass the queue entry to the CleanupTask. if the cleanup
showardfa8629c2008-11-04 16:51:23 +00001550 # fails, the job doesn't care -- it's over.
showard45ae8192008-11-05 19:32:53 +00001551 cleanup_task = CleanupTask(host=queue_entry.get_host())
1552 self.agent.dispatcher.add_agent(Agent([cleanup_task]))
showard8ebca792008-11-04 21:54:22 +00001553 else:
1554 queue_entry.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001555
1556
jadmanski0afbb632008-06-06 21:10:57 +00001557 def epilog(self):
1558 super(QueueTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001559 self._finish_task(self.success)
showard21baa452008-10-21 00:08:39 +00001560 self._reboot_hosts()
mblighbb421852008-03-11 22:36:16 +00001561
showardb18134f2009-03-20 20:52:18 +00001562 logging.info("queue_task finished with succes=%s", self.success)
mbligh36768f02008-02-22 18:28:33 +00001563
1564
mblighbb421852008-03-11 22:36:16 +00001565class RecoveryQueueTask(QueueTask):
jadmanski0afbb632008-06-06 21:10:57 +00001566 def __init__(self, job, queue_entries, run_monitor):
showard170873e2009-01-07 00:22:26 +00001567 super(RecoveryQueueTask, self).__init__(job, queue_entries, cmd=None)
jadmanski0afbb632008-06-06 21:10:57 +00001568 self.run_monitor = run_monitor
mblighbb421852008-03-11 22:36:16 +00001569
1570
jadmanski0afbb632008-06-06 21:10:57 +00001571 def run(self):
1572 self.monitor = self.run_monitor
mblighbb421852008-03-11 22:36:16 +00001573
1574
jadmanski0afbb632008-06-06 21:10:57 +00001575 def prolog(self):
1576 # recovering an existing process - don't do prolog
1577 pass
mblighbb421852008-03-11 22:36:16 +00001578
1579
showard8fe93b52008-11-18 17:53:22 +00001580class CleanupTask(PreJobTask):
showardfa8629c2008-11-04 16:51:23 +00001581 def __init__(self, host=None, queue_entry=None):
1582 assert bool(host) ^ bool(queue_entry)
1583 if queue_entry:
1584 host = queue_entry.get_host()
showardfa8629c2008-11-04 16:51:23 +00001585 self.queue_entry = queue_entry
jadmanski0afbb632008-06-06 21:10:57 +00001586 self.host = host
showard170873e2009-01-07 00:22:26 +00001587
1588 self.create_temp_resultsdir('.cleanup')
1589 self.cmd = [_autoserv_path, '-p', '--cleanup', '-m', host.hostname,
1590 '-r', _drone_manager.absolute_path(self.temp_results_dir)]
showarde788ea62008-11-17 21:02:47 +00001591 repair_task = RepairTask(host, queue_entry=queue_entry)
showard170873e2009-01-07 00:22:26 +00001592 super(CleanupTask, self).__init__(self.cmd, self.temp_results_dir,
1593 failure_tasks=[repair_task])
1594
1595 self._set_ids(host=host, queue_entries=[queue_entry])
1596 self.set_host_log_file('cleanup', self.host)
mbligh16c722d2008-03-05 00:58:44 +00001597
mblighd5c95802008-03-05 00:33:46 +00001598
jadmanski0afbb632008-06-06 21:10:57 +00001599 def prolog(self):
showard8fe93b52008-11-18 17:53:22 +00001600 super(CleanupTask, self).prolog()
showardb18134f2009-03-20 20:52:18 +00001601 logging.info("starting cleanup task for host: %s", self.host.hostname)
showard45ae8192008-11-05 19:32:53 +00001602 self.host.set_status("Cleaning")
mblighd5c95802008-03-05 00:33:46 +00001603
mblighd5c95802008-03-05 00:33:46 +00001604
showard21baa452008-10-21 00:08:39 +00001605 def epilog(self):
showard45ae8192008-11-05 19:32:53 +00001606 super(CleanupTask, self).epilog()
showard21baa452008-10-21 00:08:39 +00001607 if self.success:
showardfa8629c2008-11-04 16:51:23 +00001608 self.host.set_status('Ready')
showard21baa452008-10-21 00:08:39 +00001609 self.host.update_field('dirty', 0)
1610
1611
mblighd5c95802008-03-05 00:33:46 +00001612class AbortTask(AgentTask):
jadmanski0afbb632008-06-06 21:10:57 +00001613 def __init__(self, queue_entry, agents_to_abort):
jadmanski0afbb632008-06-06 21:10:57 +00001614 super(AbortTask, self).__init__('')
showard170873e2009-01-07 00:22:26 +00001615 self.queue_entry = queue_entry
1616 # don't use _set_ids, since we don't want to set the host_ids
1617 self.queue_entry_ids = [queue_entry.id]
1618 self.agents_to_abort = agents_to_abort
mbligh36768f02008-02-22 18:28:33 +00001619
1620
jadmanski0afbb632008-06-06 21:10:57 +00001621 def prolog(self):
showardb18134f2009-03-20 20:52:18 +00001622 logging.info("starting abort on host %s, job %s",
1623 self.queue_entry.host_id, self.queue_entry.job_id)
mbligh36768f02008-02-22 18:28:33 +00001624
mblighd64e5702008-04-04 21:39:28 +00001625
jadmanski0afbb632008-06-06 21:10:57 +00001626 def epilog(self):
1627 super(AbortTask, self).epilog()
1628 self.queue_entry.set_status('Aborted')
1629 self.success = True
1630
1631
1632 def run(self):
1633 for agent in self.agents_to_abort:
1634 if (agent.active_task):
1635 agent.active_task.abort()
mbligh36768f02008-02-22 18:28:33 +00001636
1637
showard97aed502008-11-04 02:01:24 +00001638class FinalReparseTask(AgentTask):
showard97aed502008-11-04 02:01:24 +00001639 _num_running_parses = 0
1640
1641 def __init__(self, queue_entries):
1642 self._queue_entries = queue_entries
showard170873e2009-01-07 00:22:26 +00001643 # don't use _set_ids, since we don't want to set the host_ids
1644 self.queue_entry_ids = [entry.id for entry in queue_entries]
showard97aed502008-11-04 02:01:24 +00001645 self._parse_started = False
1646
1647 assert len(queue_entries) > 0
1648 queue_entry = queue_entries[0]
showard97aed502008-11-04 02:01:24 +00001649
showard170873e2009-01-07 00:22:26 +00001650 self._execution_tag = queue_entry.execution_tag()
1651 self._results_dir = _drone_manager.absolute_path(self._execution_tag)
1652 self._autoserv_monitor = PidfileRunMonitor()
1653 self._autoserv_monitor.attach_to_existing_process(self._execution_tag)
1654 self._final_status = self._determine_final_status()
1655
showard97aed502008-11-04 02:01:24 +00001656 if _testing_mode:
1657 self.cmd = 'true'
showard170873e2009-01-07 00:22:26 +00001658 else:
1659 super(FinalReparseTask, self).__init__(
1660 cmd=self._generate_parse_command(),
1661 working_directory=self._execution_tag)
showard97aed502008-11-04 02:01:24 +00001662
showard170873e2009-01-07 00:22:26 +00001663 self.log_file = os.path.join(self._execution_tag, '.parse.log')
showard97aed502008-11-04 02:01:24 +00001664
1665
1666 @classmethod
1667 def _increment_running_parses(cls):
1668 cls._num_running_parses += 1
1669
1670
1671 @classmethod
1672 def _decrement_running_parses(cls):
1673 cls._num_running_parses -= 1
1674
1675
1676 @classmethod
1677 def _can_run_new_parse(cls):
showardd1ee1dd2009-01-07 21:33:08 +00001678 return (cls._num_running_parses <
1679 scheduler_config.config.max_parse_processes)
showard97aed502008-11-04 02:01:24 +00001680
1681
showard170873e2009-01-07 00:22:26 +00001682 def _determine_final_status(self):
1683 # we'll use a PidfileRunMonitor to read the autoserv exit status
1684 if self._autoserv_monitor.exit_code() == 0:
1685 return models.HostQueueEntry.Status.COMPLETED
1686 return models.HostQueueEntry.Status.FAILED
1687
1688
showard97aed502008-11-04 02:01:24 +00001689 def prolog(self):
1690 super(FinalReparseTask, self).prolog()
1691 for queue_entry in self._queue_entries:
1692 queue_entry.set_status(models.HostQueueEntry.Status.PARSING)
1693
1694
1695 def epilog(self):
1696 super(FinalReparseTask, self).epilog()
showard97aed502008-11-04 02:01:24 +00001697 for queue_entry in self._queue_entries:
showard170873e2009-01-07 00:22:26 +00001698 queue_entry.set_status(self._final_status)
showard97aed502008-11-04 02:01:24 +00001699
1700
showard2bab8f42008-11-12 18:15:22 +00001701 def _generate_parse_command(self):
showard170873e2009-01-07 00:22:26 +00001702 return [_parser_path, '--write-pidfile', '-l', '2', '-r', '-o',
1703 self._results_dir]
showard97aed502008-11-04 02:01:24 +00001704
1705
1706 def poll(self):
1707 # override poll to keep trying to start until the parse count goes down
1708 # and we can, at which point we revert to default behavior
1709 if self._parse_started:
1710 super(FinalReparseTask, self).poll()
1711 else:
1712 self._try_starting_parse()
1713
1714
1715 def run(self):
1716 # override run() to not actually run unless we can
1717 self._try_starting_parse()
1718
1719
1720 def _try_starting_parse(self):
1721 if not self._can_run_new_parse():
1722 return
showard170873e2009-01-07 00:22:26 +00001723
showard678df4f2009-02-04 21:36:39 +00001724 # make sure we actually have results to parse
showard35162b02009-03-03 02:17:30 +00001725 # this should never happen in normal operation
showard678df4f2009-02-04 21:36:39 +00001726 if not self._autoserv_monitor.has_process():
1727 email_manager.manager.enqueue_notify_email(
1728 'No results to parse',
1729 'No results to parse at %s' % self._autoserv_monitor.pidfile_id)
1730 self.finished(False)
1731 return
1732
showard97aed502008-11-04 02:01:24 +00001733 # actually run the parse command
showard170873e2009-01-07 00:22:26 +00001734 self.monitor = PidfileRunMonitor()
1735 self.monitor.run(self.cmd, self._working_directory,
1736 log_file=self.log_file,
1737 pidfile_name='.parser_execute',
1738 paired_with_pidfile=self._autoserv_monitor.pidfile_id)
1739
showard97aed502008-11-04 02:01:24 +00001740 self._increment_running_parses()
1741 self._parse_started = True
1742
1743
1744 def finished(self, success):
1745 super(FinalReparseTask, self).finished(success)
showard678df4f2009-02-04 21:36:39 +00001746 if self._parse_started:
1747 self._decrement_running_parses()
showard97aed502008-11-04 02:01:24 +00001748
1749
showardc9ae1782009-01-30 01:42:37 +00001750class SetEntryPendingTask(AgentTask):
1751 def __init__(self, queue_entry):
1752 super(SetEntryPendingTask, self).__init__(cmd='')
1753 self._queue_entry = queue_entry
1754 self._set_ids(queue_entries=[queue_entry])
1755
1756
1757 def run(self):
1758 agent = self._queue_entry.on_pending()
1759 if agent:
1760 self.agent.dispatcher.add_agent(agent)
1761 self.finished(True)
1762
1763
showarda3c58572009-03-12 20:36:59 +00001764class DBError(Exception):
1765 """Raised by the DBObject constructor when its select fails."""
1766
1767
mbligh36768f02008-02-22 18:28:33 +00001768class DBObject(object):
showarda3c58572009-03-12 20:36:59 +00001769 """A miniature object relational model for the database."""
showard6ae5ea92009-02-25 00:11:51 +00001770
1771 # Subclasses MUST override these:
1772 _table_name = ''
1773 _fields = ()
1774
showarda3c58572009-03-12 20:36:59 +00001775 # A mapping from (type, id) to the instance of the object for that
1776 # particular id. This prevents us from creating new Job() and Host()
1777 # instances for every HostQueueEntry object that we instantiate as
1778 # multiple HQEs often share the same Job.
1779 _instances_by_type_and_id = weakref.WeakValueDictionary()
1780 _initialized = False
showard6ae5ea92009-02-25 00:11:51 +00001781
showarda3c58572009-03-12 20:36:59 +00001782
1783 def __new__(cls, id=None, **kwargs):
1784 """
1785 Look to see if we already have an instance for this particular type
1786 and id. If so, use it instead of creating a duplicate instance.
1787 """
1788 if id is not None:
1789 instance = cls._instances_by_type_and_id.get((cls, id))
1790 if instance:
1791 return instance
1792 return super(DBObject, cls).__new__(cls, id=id, **kwargs)
1793
1794
1795 def __init__(self, id=None, row=None, new_record=False, always_query=True):
jadmanski0afbb632008-06-06 21:10:57 +00001796 assert (bool(id) != bool(row))
showard6ae5ea92009-02-25 00:11:51 +00001797 assert self._table_name, '_table_name must be defined in your class'
1798 assert self._fields, '_fields must be defined in your class'
showarda3c58572009-03-12 20:36:59 +00001799 if not new_record:
1800 if self._initialized and not always_query:
1801 return # We've already been initialized.
1802 if id is None:
1803 id = row[0]
1804 # Tell future constructors to use us instead of re-querying while
1805 # this instance is still around.
1806 self._instances_by_type_and_id[(type(self), id)] = self
mbligh36768f02008-02-22 18:28:33 +00001807
showard6ae5ea92009-02-25 00:11:51 +00001808 self.__table = self._table_name
mbligh36768f02008-02-22 18:28:33 +00001809
jadmanski0afbb632008-06-06 21:10:57 +00001810 self.__new_record = new_record
mbligh36768f02008-02-22 18:28:33 +00001811
jadmanski0afbb632008-06-06 21:10:57 +00001812 if row is None:
1813 sql = 'SELECT * FROM %s WHERE ID=%%s' % self.__table
1814 rows = _db.execute(sql, (id,))
showarda3c58572009-03-12 20:36:59 +00001815 if not rows:
1816 raise DBError("row not found (table=%s, id=%s)"
1817 % (self.__table, id))
jadmanski0afbb632008-06-06 21:10:57 +00001818 row = rows[0]
mbligh36768f02008-02-22 18:28:33 +00001819
showarda3c58572009-03-12 20:36:59 +00001820 if self._initialized:
1821 differences = self._compare_fields_in_row(row)
1822 if differences:
1823 print ('initialized %s %s instance requery is updating: %s' %
1824 (type(self), self.id, differences))
showard2bab8f42008-11-12 18:15:22 +00001825 self._update_fields_from_row(row)
showarda3c58572009-03-12 20:36:59 +00001826 self._initialized = True
1827
1828
1829 @classmethod
1830 def _clear_instance_cache(cls):
1831 """Used for testing, clear the internal instance cache."""
1832 cls._instances_by_type_and_id.clear()
1833
1834
1835 def _assert_row_length(self, row):
1836 assert len(row) == len(self._fields), (
1837 "table = %s, row = %s/%d, fields = %s/%d" % (
1838 self.__table, row, len(row), self._fields, len(self._fields)))
1839
1840
1841 def _compare_fields_in_row(self, row):
1842 """
1843 Given a row as returned by a SELECT query, compare it to our existing
1844 in memory fields.
1845
1846 @param row - A sequence of values corresponding to fields named in
1847 The class attribute _fields.
1848
1849 @returns A dictionary listing the differences keyed by field name
1850 containing tuples of (current_value, row_value).
1851 """
1852 self._assert_row_length(row)
1853 differences = {}
1854 for field, row_value in itertools.izip(self._fields, row):
1855 current_value = getattr(self, field)
1856 if current_value != row_value:
1857 differences[field] = (current_value, row_value)
1858 return differences
showard2bab8f42008-11-12 18:15:22 +00001859
1860
1861 def _update_fields_from_row(self, row):
showarda3c58572009-03-12 20:36:59 +00001862 """
1863 Update our field attributes using a single row returned by SELECT.
1864
1865 @param row - A sequence of values corresponding to fields named in
1866 the class fields list.
1867 """
1868 self._assert_row_length(row)
mbligh36768f02008-02-22 18:28:33 +00001869
showard2bab8f42008-11-12 18:15:22 +00001870 self._valid_fields = set()
showarda3c58572009-03-12 20:36:59 +00001871 for field, value in itertools.izip(self._fields, row):
showard2bab8f42008-11-12 18:15:22 +00001872 setattr(self, field, value)
1873 self._valid_fields.add(field)
mbligh36768f02008-02-22 18:28:33 +00001874
showard2bab8f42008-11-12 18:15:22 +00001875 self._valid_fields.remove('id')
mbligh36768f02008-02-22 18:28:33 +00001876
mblighe2586682008-02-29 22:45:46 +00001877
jadmanski0afbb632008-06-06 21:10:57 +00001878 def count(self, where, table = None):
1879 if not table:
1880 table = self.__table
mbligh36768f02008-02-22 18:28:33 +00001881
jadmanski0afbb632008-06-06 21:10:57 +00001882 rows = _db.execute("""
1883 SELECT count(*) FROM %s
1884 WHERE %s
1885 """ % (table, where))
mbligh6f8bab42008-02-29 22:45:14 +00001886
jadmanski0afbb632008-06-06 21:10:57 +00001887 assert len(rows) == 1
1888
1889 return int(rows[0][0])
mbligh36768f02008-02-22 18:28:33 +00001890
1891
mblighf8c624d2008-07-03 16:58:45 +00001892 def update_field(self, field, value, condition=''):
showard2bab8f42008-11-12 18:15:22 +00001893 assert field in self._valid_fields
mbligh36768f02008-02-22 18:28:33 +00001894
showard2bab8f42008-11-12 18:15:22 +00001895 if getattr(self, field) == value:
jadmanski0afbb632008-06-06 21:10:57 +00001896 return
mbligh36768f02008-02-22 18:28:33 +00001897
mblighf8c624d2008-07-03 16:58:45 +00001898 query = "UPDATE %s SET %s = %%s WHERE id = %%s" % (self.__table, field)
1899 if condition:
1900 query += ' AND (%s)' % condition
jadmanski0afbb632008-06-06 21:10:57 +00001901 _db.execute(query, (value, self.id))
1902
showard2bab8f42008-11-12 18:15:22 +00001903 setattr(self, field, value)
mbligh36768f02008-02-22 18:28:33 +00001904
1905
jadmanski0afbb632008-06-06 21:10:57 +00001906 def save(self):
1907 if self.__new_record:
showard6ae5ea92009-02-25 00:11:51 +00001908 keys = self._fields[1:] # avoid id
jadmanski0afbb632008-06-06 21:10:57 +00001909 columns = ','.join([str(key) for key in keys])
1910 values = ['"%s"' % self.__dict__[key] for key in keys]
showard89f84db2009-03-12 20:39:13 +00001911 values_str = ','.join(values)
1912 query = ('INSERT INTO %s (%s) VALUES (%s)' %
1913 (self.__table, columns, values_str))
jadmanski0afbb632008-06-06 21:10:57 +00001914 _db.execute(query)
showard89f84db2009-03-12 20:39:13 +00001915 # Update our id to the one the database just assigned to us.
1916 self.id = _db.execute('SELECT LAST_INSERT_ID()')[0][0]
mbligh36768f02008-02-22 18:28:33 +00001917
1918
jadmanski0afbb632008-06-06 21:10:57 +00001919 def delete(self):
showarda3c58572009-03-12 20:36:59 +00001920 self._instances_by_type_and_id.pop((type(self), id), None)
1921 self._initialized = False
1922 self._valid_fields.clear()
jadmanski0afbb632008-06-06 21:10:57 +00001923 query = 'DELETE FROM %s WHERE id=%%s' % self.__table
1924 _db.execute(query, (self.id,))
mblighe2586682008-02-29 22:45:46 +00001925
1926
showard63a34772008-08-18 19:32:50 +00001927 @staticmethod
1928 def _prefix_with(string, prefix):
1929 if string:
1930 string = prefix + string
1931 return string
1932
1933
jadmanski0afbb632008-06-06 21:10:57 +00001934 @classmethod
showard989f25d2008-10-01 11:38:11 +00001935 def fetch(cls, where='', params=(), joins='', order_by=''):
showard89f84db2009-03-12 20:39:13 +00001936 """
1937 Construct instances of our class based on the given database query.
1938
1939 @yields One class instance for each row fetched.
1940 """
showard63a34772008-08-18 19:32:50 +00001941 order_by = cls._prefix_with(order_by, 'ORDER BY ')
1942 where = cls._prefix_with(where, 'WHERE ')
1943 query = ('SELECT %(table)s.* FROM %(table)s %(joins)s '
showard6ae5ea92009-02-25 00:11:51 +00001944 '%(where)s %(order_by)s' % {'table' : cls._table_name,
showard63a34772008-08-18 19:32:50 +00001945 'joins' : joins,
1946 'where' : where,
1947 'order_by' : order_by})
1948 rows = _db.execute(query, params)
jadmanski0afbb632008-06-06 21:10:57 +00001949 for row in rows:
1950 yield cls(row=row)
mblighe2586682008-02-29 22:45:46 +00001951
mbligh36768f02008-02-22 18:28:33 +00001952
1953class IneligibleHostQueue(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001954 _table_name = 'ineligible_host_queues'
1955 _fields = ('id', 'job_id', 'host_id')
showard04c82c52008-05-29 19:38:12 +00001956
1957
showard89f84db2009-03-12 20:39:13 +00001958class AtomicGroup(DBObject):
1959 _table_name = 'atomic_groups'
1960 _fields = ('id', 'name', 'description', 'max_number_of_machines')
1961
1962
showard989f25d2008-10-01 11:38:11 +00001963class Label(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001964 _table_name = 'labels'
1965 _fields = ('id', 'name', 'kernel_config', 'platform', 'invalid',
showard89f84db2009-03-12 20:39:13 +00001966 'only_if_needed', 'atomic_group_id')
showard989f25d2008-10-01 11:38:11 +00001967
1968
mbligh36768f02008-02-22 18:28:33 +00001969class Host(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00001970 _table_name = 'hosts'
1971 _fields = ('id', 'hostname', 'locked', 'synch_id', 'status',
1972 'invalid', 'protection', 'locked_by_id', 'lock_time', 'dirty')
1973
1974
jadmanski0afbb632008-06-06 21:10:57 +00001975 def current_task(self):
1976 rows = _db.execute("""
1977 SELECT * FROM host_queue_entries WHERE host_id=%s AND NOT complete AND active
1978 """, (self.id,))
1979
1980 if len(rows) == 0:
1981 return None
1982 else:
1983 assert len(rows) == 1
1984 results = rows[0];
jadmanski0afbb632008-06-06 21:10:57 +00001985 return HostQueueEntry(row=results)
mbligh36768f02008-02-22 18:28:33 +00001986
1987
jadmanski0afbb632008-06-06 21:10:57 +00001988 def yield_work(self):
showardb18134f2009-03-20 20:52:18 +00001989 logging.info("%s yielding work", self.hostname)
jadmanski0afbb632008-06-06 21:10:57 +00001990 if self.current_task():
1991 self.current_task().requeue()
1992
showard6ae5ea92009-02-25 00:11:51 +00001993
jadmanski0afbb632008-06-06 21:10:57 +00001994 def set_status(self,status):
showardb18134f2009-03-20 20:52:18 +00001995 logging.info('%s -> %s', self.hostname, status)
jadmanski0afbb632008-06-06 21:10:57 +00001996 self.update_field('status',status)
mbligh36768f02008-02-22 18:28:33 +00001997
1998
showard170873e2009-01-07 00:22:26 +00001999 def platform_and_labels(self):
showardd8e548a2008-09-09 03:04:57 +00002000 """
showard170873e2009-01-07 00:22:26 +00002001 Returns a tuple (platform_name, list_of_all_label_names).
showardd8e548a2008-09-09 03:04:57 +00002002 """
2003 rows = _db.execute("""
showard170873e2009-01-07 00:22:26 +00002004 SELECT labels.name, labels.platform
showardd8e548a2008-09-09 03:04:57 +00002005 FROM labels
2006 INNER JOIN hosts_labels ON labels.id = hosts_labels.label_id
showard170873e2009-01-07 00:22:26 +00002007 WHERE hosts_labels.host_id = %s
showardd8e548a2008-09-09 03:04:57 +00002008 ORDER BY labels.name
2009 """, (self.id,))
showard170873e2009-01-07 00:22:26 +00002010 platform = None
2011 all_labels = []
2012 for label_name, is_platform in rows:
2013 if is_platform:
2014 platform = label_name
2015 all_labels.append(label_name)
2016 return platform, all_labels
2017
2018
2019 def reverify_tasks(self):
2020 cleanup_task = CleanupTask(host=self)
2021 verify_task = VerifyTask(host=self)
2022 # just to make sure this host does not get taken away
2023 self.set_status('Cleaning')
2024 return [cleanup_task, verify_task]
showardd8e548a2008-09-09 03:04:57 +00002025
2026
mbligh36768f02008-02-22 18:28:33 +00002027class HostQueueEntry(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002028 _table_name = 'host_queue_entries'
2029 _fields = ('id', 'job_id', 'host_id', 'status', 'meta_host',
showard89f84db2009-03-12 20:39:13 +00002030 'active', 'complete', 'deleted', 'execution_subdir',
2031 'atomic_group_id')
showard6ae5ea92009-02-25 00:11:51 +00002032
2033
showarda3c58572009-03-12 20:36:59 +00002034 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002035 assert id or row
showarda3c58572009-03-12 20:36:59 +00002036 super(HostQueueEntry, self).__init__(id=id, row=row, **kwargs)
jadmanski0afbb632008-06-06 21:10:57 +00002037 self.job = Job(self.job_id)
mbligh36768f02008-02-22 18:28:33 +00002038
jadmanski0afbb632008-06-06 21:10:57 +00002039 if self.host_id:
2040 self.host = Host(self.host_id)
2041 else:
2042 self.host = None
mbligh36768f02008-02-22 18:28:33 +00002043
showard170873e2009-01-07 00:22:26 +00002044 self.queue_log_path = os.path.join(self.job.tag(),
jadmanski0afbb632008-06-06 21:10:57 +00002045 'queue.log.' + str(self.id))
mbligh36768f02008-02-22 18:28:33 +00002046
2047
showard89f84db2009-03-12 20:39:13 +00002048 @classmethod
2049 def clone(cls, template):
2050 """
2051 Creates a new row using the values from a template instance.
2052
2053 The new instance will not exist in the database or have a valid
2054 id attribute until its save() method is called.
2055 """
2056 assert isinstance(template, cls)
2057 new_row = [getattr(template, field) for field in cls._fields]
2058 clone = cls(row=new_row, new_record=True)
2059 clone.id = None
2060 return clone
2061
2062
showardc85c21b2008-11-24 22:17:37 +00002063 def _view_job_url(self):
2064 return "%s#tab_id=view_job&object_id=%s" % (_base_url, self.job.id)
2065
2066
jadmanski0afbb632008-06-06 21:10:57 +00002067 def set_host(self, host):
2068 if host:
2069 self.queue_log_record('Assigning host ' + host.hostname)
2070 self.update_field('host_id', host.id)
2071 self.update_field('active', True)
2072 self.block_host(host.id)
2073 else:
2074 self.queue_log_record('Releasing host')
2075 self.unblock_host(self.host.id)
2076 self.update_field('host_id', None)
mbligh36768f02008-02-22 18:28:33 +00002077
jadmanski0afbb632008-06-06 21:10:57 +00002078 self.host = host
mbligh36768f02008-02-22 18:28:33 +00002079
2080
jadmanski0afbb632008-06-06 21:10:57 +00002081 def get_host(self):
2082 return self.host
mbligh36768f02008-02-22 18:28:33 +00002083
2084
jadmanski0afbb632008-06-06 21:10:57 +00002085 def queue_log_record(self, log_line):
2086 now = str(datetime.datetime.now())
showard170873e2009-01-07 00:22:26 +00002087 _drone_manager.write_lines_to_file(self.queue_log_path,
2088 [now + ' ' + log_line])
mbligh36768f02008-02-22 18:28:33 +00002089
2090
jadmanski0afbb632008-06-06 21:10:57 +00002091 def block_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002092 logging.info("creating block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002093 row = [0, self.job.id, host_id]
2094 block = IneligibleHostQueue(row=row, new_record=True)
2095 block.save()
mblighe2586682008-02-29 22:45:46 +00002096
2097
jadmanski0afbb632008-06-06 21:10:57 +00002098 def unblock_host(self, host_id):
showardb18134f2009-03-20 20:52:18 +00002099 logging.info("removing block %s/%s", self.job.id, host_id)
jadmanski0afbb632008-06-06 21:10:57 +00002100 blocks = IneligibleHostQueue.fetch(
2101 'job_id=%d and host_id=%d' % (self.job.id, host_id))
2102 for block in blocks:
2103 block.delete()
mblighe2586682008-02-29 22:45:46 +00002104
2105
showard2bab8f42008-11-12 18:15:22 +00002106 def set_execution_subdir(self, subdir=None):
2107 if subdir is None:
2108 assert self.get_host()
2109 subdir = self.get_host().hostname
2110 self.update_field('execution_subdir', subdir)
mbligh36768f02008-02-22 18:28:33 +00002111
2112
showard6355f6b2008-12-05 18:52:13 +00002113 def _get_hostname(self):
2114 if self.host:
2115 return self.host.hostname
2116 return 'no host'
2117
2118
showard170873e2009-01-07 00:22:26 +00002119 def __str__(self):
2120 return "%s/%d (%d)" % (self._get_hostname(), self.job.id, self.id)
2121
2122
jadmanski0afbb632008-06-06 21:10:57 +00002123 def set_status(self, status):
mblighf8c624d2008-07-03 16:58:45 +00002124 abort_statuses = ['Abort', 'Aborting', 'Aborted']
2125 if status not in abort_statuses:
2126 condition = ' AND '.join(['status <> "%s"' % x
2127 for x in abort_statuses])
2128 else:
2129 condition = ''
2130 self.update_field('status', status, condition=condition)
2131
showardb18134f2009-03-20 20:52:18 +00002132 logging.info("%s -> %s", self, self.status)
mblighf8c624d2008-07-03 16:58:45 +00002133
showardc85c21b2008-11-24 22:17:37 +00002134 if status in ['Queued', 'Parsing']:
jadmanski0afbb632008-06-06 21:10:57 +00002135 self.update_field('complete', False)
2136 self.update_field('active', False)
mbligh36768f02008-02-22 18:28:33 +00002137
jadmanski0afbb632008-06-06 21:10:57 +00002138 if status in ['Pending', 'Running', 'Verifying', 'Starting',
showarde58e3f82008-11-20 19:04:59 +00002139 'Aborting']:
jadmanski0afbb632008-06-06 21:10:57 +00002140 self.update_field('complete', False)
2141 self.update_field('active', True)
mbligh36768f02008-02-22 18:28:33 +00002142
showardc85c21b2008-11-24 22:17:37 +00002143 if status in ['Failed', 'Completed', 'Stopped', 'Aborted']:
jadmanski0afbb632008-06-06 21:10:57 +00002144 self.update_field('complete', True)
2145 self.update_field('active', False)
showardc85c21b2008-11-24 22:17:37 +00002146
2147 should_email_status = (status.lower() in _notify_email_statuses or
2148 'all' in _notify_email_statuses)
2149 if should_email_status:
2150 self._email_on_status(status)
2151
2152 self._email_on_job_complete()
2153
2154
2155 def _email_on_status(self, status):
showard6355f6b2008-12-05 18:52:13 +00002156 hostname = self._get_hostname()
showardc85c21b2008-11-24 22:17:37 +00002157
2158 subject = 'Autotest: Job ID: %s "%s" Host: %s %s' % (
2159 self.job.id, self.job.name, hostname, status)
2160 body = "Job ID: %s\nJob Name: %s\nHost: %s\nStatus: %s\n%s\n" % (
2161 self.job.id, self.job.name, hostname, status,
2162 self._view_job_url())
showard170873e2009-01-07 00:22:26 +00002163 email_manager.manager.send_email(self.job.email_list, subject, body)
showard542e8402008-09-19 20:16:18 +00002164
2165
2166 def _email_on_job_complete(self):
showardc85c21b2008-11-24 22:17:37 +00002167 if not self.job.is_finished():
2168 return
showard542e8402008-09-19 20:16:18 +00002169
showardc85c21b2008-11-24 22:17:37 +00002170 summary_text = []
showard6355f6b2008-12-05 18:52:13 +00002171 hosts_queue = HostQueueEntry.fetch('job_id = %s' % self.job.id)
showardc85c21b2008-11-24 22:17:37 +00002172 for queue_entry in hosts_queue:
2173 summary_text.append("Host: %s Status: %s" %
showard6355f6b2008-12-05 18:52:13 +00002174 (queue_entry._get_hostname(),
showardc85c21b2008-11-24 22:17:37 +00002175 queue_entry.status))
2176
2177 summary_text = "\n".join(summary_text)
2178 status_counts = models.Job.objects.get_status_counts(
2179 [self.job.id])[self.job.id]
2180 status = ', '.join('%d %s' % (count, status) for status, count
2181 in status_counts.iteritems())
2182
2183 subject = 'Autotest: Job ID: %s "%s" %s' % (
2184 self.job.id, self.job.name, status)
2185 body = "Job ID: %s\nJob Name: %s\nStatus: %s\n%s\nSummary:\n%s" % (
2186 self.job.id, self.job.name, status, self._view_job_url(),
2187 summary_text)
showard170873e2009-01-07 00:22:26 +00002188 email_manager.manager.send_email(self.job.email_list, subject, body)
mbligh36768f02008-02-22 18:28:33 +00002189
2190
showard89f84db2009-03-12 20:39:13 +00002191 def run(self, assigned_host=None):
2192 if self.meta_host is not None or self.atomic_group_id is not None:
jadmanski0afbb632008-06-06 21:10:57 +00002193 assert assigned_host
2194 # ensure results dir exists for the queue log
jadmanski0afbb632008-06-06 21:10:57 +00002195 self.set_host(assigned_host)
mbligh36768f02008-02-22 18:28:33 +00002196
showardb18134f2009-03-20 20:52:18 +00002197 logging.info("%s/%s/%s scheduled on %s, status=%s",
2198 self.job.name, self.meta_host, self.atomic_group_id,
2199 self.host.hostname, self.status)
mbligh36768f02008-02-22 18:28:33 +00002200
jadmanski0afbb632008-06-06 21:10:57 +00002201 return self.job.run(queue_entry=self)
mblighe2586682008-02-29 22:45:46 +00002202
showard6ae5ea92009-02-25 00:11:51 +00002203
jadmanski0afbb632008-06-06 21:10:57 +00002204 def requeue(self):
2205 self.set_status('Queued')
showardde634ee2009-01-30 01:44:24 +00002206 # verify/cleanup failure sets the execution subdir, so reset it here
2207 self.set_execution_subdir('')
jadmanski0afbb632008-06-06 21:10:57 +00002208 if self.meta_host:
2209 self.set_host(None)
mbligh36768f02008-02-22 18:28:33 +00002210
2211
jadmanski0afbb632008-06-06 21:10:57 +00002212 def handle_host_failure(self):
2213 """\
2214 Called when this queue entry's host has failed verification and
2215 repair.
2216 """
2217 assert not self.meta_host
2218 self.set_status('Failed')
showard2bab8f42008-11-12 18:15:22 +00002219 self.job.stop_if_necessary()
mblighe2586682008-02-29 22:45:46 +00002220
2221
jadmanskif7fa2cc2008-10-01 14:13:23 +00002222 @property
2223 def aborted_by(self):
2224 self._load_abort_info()
2225 return self._aborted_by
2226
2227
2228 @property
2229 def aborted_on(self):
2230 self._load_abort_info()
2231 return self._aborted_on
2232
2233
2234 def _load_abort_info(self):
2235 """ Fetch info about who aborted the job. """
2236 if hasattr(self, "_aborted_by"):
2237 return
2238 rows = _db.execute("""
2239 SELECT users.login, aborted_host_queue_entries.aborted_on
2240 FROM aborted_host_queue_entries
2241 INNER JOIN users
2242 ON users.id = aborted_host_queue_entries.aborted_by_id
2243 WHERE aborted_host_queue_entries.queue_entry_id = %s
2244 """, (self.id,))
2245 if rows:
2246 self._aborted_by, self._aborted_on = rows[0]
2247 else:
2248 self._aborted_by = self._aborted_on = None
2249
2250
showardb2e2c322008-10-14 17:33:55 +00002251 def on_pending(self):
2252 """
2253 Called when an entry in a synchronous job has passed verify. If the
2254 job is ready to run, returns an agent to run the job. Returns None
2255 otherwise.
2256 """
2257 self.set_status('Pending')
showardcfd66a32008-10-15 20:31:48 +00002258 self.get_host().set_status('Pending')
showardb2e2c322008-10-14 17:33:55 +00002259 if self.job.is_ready():
2260 return self.job.run(self)
showard2bab8f42008-11-12 18:15:22 +00002261 self.job.stop_if_necessary()
showardb2e2c322008-10-14 17:33:55 +00002262 return None
2263
2264
showard170873e2009-01-07 00:22:26 +00002265 def abort(self, dispatcher, agents_to_abort=[]):
showard1be97432008-10-17 15:30:45 +00002266 host = self.get_host()
showard9d9ffd52008-11-09 23:14:35 +00002267 if self.active and host:
showard170873e2009-01-07 00:22:26 +00002268 dispatcher.add_agent(Agent(tasks=host.reverify_tasks()))
showard1be97432008-10-17 15:30:45 +00002269
showard170873e2009-01-07 00:22:26 +00002270 abort_task = AbortTask(self, agents_to_abort)
showard1be97432008-10-17 15:30:45 +00002271 self.set_status('Aborting')
showard170873e2009-01-07 00:22:26 +00002272 dispatcher.add_agent(Agent(tasks=[abort_task], num_processes=0))
2273
2274 def execution_tag(self):
2275 assert self.execution_subdir
2276 return "%s-%s/%s" % (self.job.id, self.job.owner, self.execution_subdir)
showard1be97432008-10-17 15:30:45 +00002277
2278
mbligh36768f02008-02-22 18:28:33 +00002279class Job(DBObject):
showard6ae5ea92009-02-25 00:11:51 +00002280 _table_name = 'jobs'
2281 _fields = ('id', 'owner', 'name', 'priority', 'control_file',
2282 'control_type', 'created_on', 'synch_count', 'timeout',
2283 'run_verify', 'email_list', 'reboot_before', 'reboot_after')
2284
2285
showarda3c58572009-03-12 20:36:59 +00002286 def __init__(self, id=None, row=None, **kwargs):
jadmanski0afbb632008-06-06 21:10:57 +00002287 assert id or row
showarda3c58572009-03-12 20:36:59 +00002288 super(Job, self).__init__(id=id, row=row, **kwargs)
mbligh36768f02008-02-22 18:28:33 +00002289
mblighe2586682008-02-29 22:45:46 +00002290
jadmanski0afbb632008-06-06 21:10:57 +00002291 def is_server_job(self):
2292 return self.control_type != 2
mbligh36768f02008-02-22 18:28:33 +00002293
2294
showard170873e2009-01-07 00:22:26 +00002295 def tag(self):
2296 return "%s-%s" % (self.id, self.owner)
2297
2298
jadmanski0afbb632008-06-06 21:10:57 +00002299 def get_host_queue_entries(self):
2300 rows = _db.execute("""
2301 SELECT * FROM host_queue_entries
2302 WHERE job_id= %s
2303 """, (self.id,))
2304 entries = [HostQueueEntry(row=i) for i in rows]
mbligh36768f02008-02-22 18:28:33 +00002305
jadmanski0afbb632008-06-06 21:10:57 +00002306 assert len(entries)>0
mbligh36768f02008-02-22 18:28:33 +00002307
jadmanski0afbb632008-06-06 21:10:57 +00002308 return entries
mbligh36768f02008-02-22 18:28:33 +00002309
2310
jadmanski0afbb632008-06-06 21:10:57 +00002311 def set_status(self, status, update_queues=False):
2312 self.update_field('status',status)
2313
2314 if update_queues:
2315 for queue_entry in self.get_host_queue_entries():
2316 queue_entry.set_status(status)
mbligh36768f02008-02-22 18:28:33 +00002317
2318
jadmanski0afbb632008-06-06 21:10:57 +00002319 def is_ready(self):
showard2bab8f42008-11-12 18:15:22 +00002320 pending_entries = models.HostQueueEntry.objects.filter(job=self.id,
2321 status='Pending')
2322 return (pending_entries.count() >= self.synch_count)
mbligh36768f02008-02-22 18:28:33 +00002323
2324
jadmanski0afbb632008-06-06 21:10:57 +00002325 def num_machines(self, clause = None):
2326 sql = "job_id=%s" % self.id
2327 if clause:
2328 sql += " AND (%s)" % clause
2329 return self.count(sql, table='host_queue_entries')
mbligh36768f02008-02-22 18:28:33 +00002330
2331
jadmanski0afbb632008-06-06 21:10:57 +00002332 def num_queued(self):
2333 return self.num_machines('not complete')
mbligh36768f02008-02-22 18:28:33 +00002334
2335
jadmanski0afbb632008-06-06 21:10:57 +00002336 def num_active(self):
2337 return self.num_machines('active')
mbligh36768f02008-02-22 18:28:33 +00002338
2339
jadmanski0afbb632008-06-06 21:10:57 +00002340 def num_complete(self):
2341 return self.num_machines('complete')
mbligh36768f02008-02-22 18:28:33 +00002342
2343
jadmanski0afbb632008-06-06 21:10:57 +00002344 def is_finished(self):
showardc85c21b2008-11-24 22:17:37 +00002345 return self.num_complete() == self.num_machines()
mbligh36768f02008-02-22 18:28:33 +00002346
mbligh36768f02008-02-22 18:28:33 +00002347
showard6bb7c292009-01-30 01:44:51 +00002348 def _not_yet_run_entries(self, include_verifying=True):
2349 statuses = [models.HostQueueEntry.Status.QUEUED,
2350 models.HostQueueEntry.Status.PENDING]
2351 if include_verifying:
2352 statuses.append(models.HostQueueEntry.Status.VERIFYING)
2353 return models.HostQueueEntry.objects.filter(job=self.id,
2354 status__in=statuses)
2355
2356
2357 def _stop_all_entries(self):
2358 entries_to_stop = self._not_yet_run_entries(
2359 include_verifying=False)
2360 for child_entry in entries_to_stop:
showard4f9e5372009-01-07 21:33:38 +00002361 assert not child_entry.complete, (
2362 '%s status=%s, active=%s, complete=%s' %
2363 (child_entry.id, child_entry.status, child_entry.active,
2364 child_entry.complete))
showard2bab8f42008-11-12 18:15:22 +00002365 if child_entry.status == models.HostQueueEntry.Status.PENDING:
2366 child_entry.host.status = models.Host.Status.READY
2367 child_entry.host.save()
2368 child_entry.status = models.HostQueueEntry.Status.STOPPED
2369 child_entry.save()
2370
showard2bab8f42008-11-12 18:15:22 +00002371 def stop_if_necessary(self):
showard6bb7c292009-01-30 01:44:51 +00002372 not_yet_run = self._not_yet_run_entries()
showard2bab8f42008-11-12 18:15:22 +00002373 if not_yet_run.count() < self.synch_count:
showard6bb7c292009-01-30 01:44:51 +00002374 self._stop_all_entries()
mblighe2586682008-02-29 22:45:46 +00002375
2376
jadmanski0afbb632008-06-06 21:10:57 +00002377 def write_to_machines_file(self, queue_entry):
2378 hostname = queue_entry.get_host().hostname
showard170873e2009-01-07 00:22:26 +00002379 file_path = os.path.join(self.tag(), '.machines')
2380 _drone_manager.write_lines_to_file(file_path, [hostname])
mbligh36768f02008-02-22 18:28:33 +00002381
2382
showard2bab8f42008-11-12 18:15:22 +00002383 def _next_group_name(self):
2384 query = models.HostQueueEntry.objects.filter(
2385 job=self.id).values('execution_subdir').distinct()
2386 subdirs = (entry['execution_subdir'] for entry in query)
2387 groups = (re.match(r'group(\d+)', subdir) for subdir in subdirs)
2388 ids = [int(match.group(1)) for match in groups if match]
2389 if ids:
2390 next_id = max(ids) + 1
2391 else:
2392 next_id = 0
2393 return "group%d" % next_id
2394
2395
showard170873e2009-01-07 00:22:26 +00002396 def _write_control_file(self, execution_tag):
2397 control_path = _drone_manager.attach_file_to_execution(
2398 execution_tag, self.control_file)
2399 return control_path
mbligh36768f02008-02-22 18:28:33 +00002400
showardb2e2c322008-10-14 17:33:55 +00002401
showard2bab8f42008-11-12 18:15:22 +00002402 def get_group_entries(self, queue_entry_from_group):
2403 execution_subdir = queue_entry_from_group.execution_subdir
showarde788ea62008-11-17 21:02:47 +00002404 return list(HostQueueEntry.fetch(
2405 where='job_id=%s AND execution_subdir=%s',
2406 params=(self.id, execution_subdir)))
showard2bab8f42008-11-12 18:15:22 +00002407
2408
showardb2e2c322008-10-14 17:33:55 +00002409 def _get_autoserv_params(self, queue_entries):
showard170873e2009-01-07 00:22:26 +00002410 assert queue_entries
2411 execution_tag = queue_entries[0].execution_tag()
2412 control_path = self._write_control_file(execution_tag)
jadmanski0afbb632008-06-06 21:10:57 +00002413 hostnames = ','.join([entry.get_host().hostname
2414 for entry in queue_entries])
mbligh36768f02008-02-22 18:28:33 +00002415
showard170873e2009-01-07 00:22:26 +00002416 params = [_autoserv_path, '-P', execution_tag, '-p', '-n',
2417 '-r', _drone_manager.absolute_path(execution_tag),
2418 '-u', self.owner, '-l', self.name, '-m', hostnames,
2419 _drone_manager.absolute_path(control_path)]
mbligh36768f02008-02-22 18:28:33 +00002420
jadmanski0afbb632008-06-06 21:10:57 +00002421 if not self.is_server_job():
2422 params.append('-c')
mbligh36768f02008-02-22 18:28:33 +00002423
showardb2e2c322008-10-14 17:33:55 +00002424 return params
mblighe2586682008-02-29 22:45:46 +00002425
mbligh36768f02008-02-22 18:28:33 +00002426
showardc9ae1782009-01-30 01:42:37 +00002427 def _should_run_cleanup(self, queue_entry):
showard0fc38302008-10-23 00:44:07 +00002428 if self.reboot_before == models.RebootBefore.ALWAYS:
showardc9ae1782009-01-30 01:42:37 +00002429 return True
showard0fc38302008-10-23 00:44:07 +00002430 elif self.reboot_before == models.RebootBefore.IF_DIRTY:
showardc9ae1782009-01-30 01:42:37 +00002431 return queue_entry.get_host().dirty
2432 return False
showard21baa452008-10-21 00:08:39 +00002433
showardc9ae1782009-01-30 01:42:37 +00002434
2435 def _should_run_verify(self, queue_entry):
2436 do_not_verify = (queue_entry.host.protection ==
2437 host_protections.Protection.DO_NOT_VERIFY)
2438 if do_not_verify:
2439 return False
2440 return self.run_verify
2441
2442
2443 def _get_pre_job_tasks(self, queue_entry):
showard21baa452008-10-21 00:08:39 +00002444 tasks = []
showardc9ae1782009-01-30 01:42:37 +00002445 if self._should_run_cleanup(queue_entry):
showard45ae8192008-11-05 19:32:53 +00002446 tasks.append(CleanupTask(queue_entry=queue_entry))
showardc9ae1782009-01-30 01:42:37 +00002447 if self._should_run_verify(queue_entry):
2448 tasks.append(VerifyTask(queue_entry=queue_entry))
2449 tasks.append(SetEntryPendingTask(queue_entry))
showard21baa452008-10-21 00:08:39 +00002450 return tasks
2451
2452
showard2bab8f42008-11-12 18:15:22 +00002453 def _assign_new_group(self, queue_entries):
2454 if len(queue_entries) == 1:
2455 group_name = queue_entries[0].get_host().hostname
2456 else:
2457 group_name = self._next_group_name()
showardb18134f2009-03-20 20:52:18 +00002458 logging.info('Running synchronous job %d hosts %s as %s',
showard2bab8f42008-11-12 18:15:22 +00002459 self.id, [entry.host.hostname for entry in queue_entries],
2460 group_name)
2461
2462 for queue_entry in queue_entries:
2463 queue_entry.set_execution_subdir(group_name)
2464
2465
2466 def _choose_group_to_run(self, include_queue_entry):
2467 chosen_entries = [include_queue_entry]
2468
2469 num_entries_needed = self.synch_count - 1
2470 if num_entries_needed > 0:
2471 pending_entries = HostQueueEntry.fetch(
2472 where='job_id = %s AND status = "Pending" AND id != %s',
2473 params=(self.id, include_queue_entry.id))
2474 chosen_entries += list(pending_entries)[:num_entries_needed]
2475
2476 self._assign_new_group(chosen_entries)
2477 return chosen_entries
2478
2479
2480 def run(self, queue_entry):
showardb2e2c322008-10-14 17:33:55 +00002481 if not self.is_ready():
showardc9ae1782009-01-30 01:42:37 +00002482 queue_entry.set_status(models.HostQueueEntry.Status.VERIFYING)
2483 return Agent(self._get_pre_job_tasks(queue_entry))
mbligh36768f02008-02-22 18:28:33 +00002484
showard2bab8f42008-11-12 18:15:22 +00002485 queue_entries = self._choose_group_to_run(queue_entry)
2486 return self._finish_run(queue_entries)
showardb2e2c322008-10-14 17:33:55 +00002487
2488
2489 def _finish_run(self, queue_entries, initial_tasks=[]):
showardb2ccdda2008-10-28 20:39:05 +00002490 for queue_entry in queue_entries:
2491 queue_entry.set_status('Starting')
showardb2e2c322008-10-14 17:33:55 +00002492 params = self._get_autoserv_params(queue_entries)
2493 queue_task = QueueTask(job=self, queue_entries=queue_entries,
2494 cmd=params)
2495 tasks = initial_tasks + [queue_task]
2496 entry_ids = [entry.id for entry in queue_entries]
2497
showard170873e2009-01-07 00:22:26 +00002498 return Agent(tasks, num_processes=len(queue_entries))
showardb2e2c322008-10-14 17:33:55 +00002499
2500
mbligh36768f02008-02-22 18:28:33 +00002501if __name__ == '__main__':
jadmanski0afbb632008-06-06 21:10:57 +00002502 main()